From 1e27c802c89362fd00009f2359730508a840e4af Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 2 Feb 2024 23:02:12 +0000 Subject: common_merge_chain_optimalization_minimum_size hardcoded at 64 --- storage/src/tests/persistence/mergehandlertest.cpp | 4 +- .../src/vespa/storage/persistence/mergehandler.cpp | 97 ++++++++-------------- .../src/vespa/storage/persistence/mergehandler.h | 44 +++------- .../storage/persistence/persistencehandler.cpp | 4 +- 4 files changed, 50 insertions(+), 99 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index f79235ae505..4bd0570efa8 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -170,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils { MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208); } std::shared_ptr get_queued_reply() { diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index b1e36147e30..7ee2d9f37bf 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -27,15 +27,13 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, - uint32_t maxChunkSize, - uint32_t commonMergeChainOptimalizationMinimumSize) + uint32_t maxChunkSize) : _clock(clock), _cluster_context(cluster_context), _env(env), _spi(spi), _monitored_ref_count(std::make_unique()), _maxChunkSize(maxChunkSize), - _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), _executor(executor) { } @@ -48,6 +46,8 @@ MergeHandler::~MergeHandler() namespace { +constexpr uint32_t COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE = 64u; + constexpr int getDeleteFlag() { // Referred into old slotfile code before. Where should this number come from? return 2; @@ -175,7 +175,7 @@ MergeHandler::buildBucketInfoList( std::vector& output, spi::Context& context) const { - assert(output.size() == 0); + assert(output.empty()); assert(myNodeIndex < 16); uint32_t oldSize = output.size(); using DbBucketInfo = api::BucketInfo; @@ -487,13 +487,12 @@ MergeHandler::fetchLocalData( } document::Document::UP -MergeHandler::deserializeDiffDocument( - const api::ApplyBucketDiffCommand::Entry& e, - const document::DocumentTypeRepo& repo) const +MergeHandler::deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, + const document::DocumentTypeRepo& repo) const { auto doc = std::make_unique(); vespalib::nbostream hbuf(&e._headerBlob[0], e._headerBlob.size()); - if (e._bodyBlob.size() > 0) { + if (!e._bodyBlob.empty()) { // TODO Remove this branch and add warning on error. vespalib::nbostream bbuf(&e._bodyBlob[0], e._bodyBlob.size()); doc->deserialize(repo, hbuf, bbuf); @@ -533,17 +532,13 @@ MergeHandler::applyDiffEntry(std::shared_ptr async_results * Apply the diffs needed locally. */ void -MergeHandler::applyDiffLocally( - const spi::Bucket& bucket, - std::vector& diff, - uint8_t nodeIndex, - spi::Context& context, - std::shared_ptr async_results) const +MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector& diff, + uint8_t nodeIndex, spi::Context& context, + const std::shared_ptr & async_results) const { // Sort the data to apply by which file they should be added to LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries", - bucket.toString().c_str(), - diff.size()); + bucket.toString().c_str(), diff.size()); uint32_t nodeMask = 1 << nodeIndex; uint32_t byteCount = 0; uint32_t addedCount = 0; @@ -563,9 +558,8 @@ MergeHandler::applyDiffLocally( if (spi::Timestamp(e._entry._timestamp) > existing.getTimestamp()) { ++j; - LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and " - "already present in persistence", bucket.toString().c_str(), - existing.toString().c_str()); + LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and already present in persistence", + bucket.toString().c_str(), existing.toString().c_str()); continue; } if ((e._entry._hasMask & nodeMask) != 0) { @@ -576,8 +570,7 @@ MergeHandler::applyDiffLocally( } if (!e.filled()) { ++i; - LOG(debug, "Failed to apply unretrieved entry %s to diff " - "locally on %s. Entry was probably compacted away.", + LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.", e.toString().c_str(), bucket.toString().c_str()); continue; } @@ -596,19 +589,14 @@ MergeHandler::applyDiffLocally( ++i; ++j; if ((e._entry._flags & DELETED) && !existing.isRemove()) { - LOG(debug, "Slot in diff is remove for existing " - "timestamp in %s. Diff slot: %s. Existing slot: %s", - bucket.toString().c_str(), e.toString().c_str(), - existing.toString().c_str()); + LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s", + bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); applyDiffEntry(async_results, bucket, e, repo); } else { // Duplicate put, just ignore it. - LOG(debug, "During diff apply, attempting to add slot " - "whose timestamp already exists in %s, but assuming " - "these are for the same entry--ignoring it. " - "Diff slot: %s. Existing slot: %s", - bucket.toString().c_str(), e.toString().c_str(), - existing.toString().c_str()); + LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, " + " but assuming these are for the same entry--ignoring it. Diff slot: %s. Existing slot: %s", + bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); } continue; } @@ -623,8 +611,7 @@ MergeHandler::applyDiffLocally( continue; } if (!e.filled()) { - LOG(debug, "Failed to apply unretrieved entry %s to diff " - "locally on %s. Entry was probably compacted away.", + LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.", e.toString().c_str(), bucket.toString().c_str()); continue; } @@ -650,19 +637,13 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket)); if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) { LOG(warning, "Failed to get bucket info for %s: %s", - bucket.toString().c_str(), - infoResult.getErrorMessage().c_str()); - throw std::runtime_error("Failed to invoke getBucketInfo on " - "persistence provider"); + bucket.toString().c_str(), infoResult.getErrorMessage().c_str()); + throw std::runtime_error("Failed to invoke getBucketInfo on persistence provider"); } const spi::BucketInfo& tmpInfo(infoResult.getBucketInfo()); - api::BucketInfo providerInfo(tmpInfo.getChecksum(), - tmpInfo.getDocumentCount(), - tmpInfo.getDocumentSize(), - tmpInfo.getEntryCount(), - tmpInfo.getUsedSize(), - tmpInfo.isReady(), - tmpInfo.isActive()); + api::BucketInfo providerInfo(tmpInfo.getChecksum(), tmpInfo.getDocumentCount(), tmpInfo.getDocumentSize(), + tmpInfo.getEntryCount(), tmpInfo.getUsedSize(), + tmpInfo.isReady(), tmpInfo.isActive()); _env.updateBucketDatabase(bucket.getBucket(), providerInfo); } @@ -698,9 +679,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, // If last action failed, fail the whole merge if (status.reply->getResult().failed()) { LOG(warning, "Done with merge of %s (failed: %s) %s", - bucket.toString().c_str(), - status.reply->getResult().toString().c_str(), - status.toString().c_str()); + bucket.toString().c_str(), status.reply->getResult().toString().c_str(), status.toString().c_str()); return status.reply; } @@ -732,13 +711,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, cmd = std::make_shared(bucket.getBucket(), nodes); cmd->setAddress(createAddress(_cluster_context.cluster_name_ptr(), nodes[1].index)); - findCandidates(status, - active_nodes_mask, - true, - 1 << (status.nodeList.size() - 1), - 1 << (nodes.size() - 1), - *cmd); - if (cmd->getDiff().size() != 0) { + findCandidates(status, active_nodes_mask, true, 1 << (status.nodeList.size() - 1), + 1 << (nodes.size() - 1), *cmd); + if (!cmd->getDiff().empty()) { break; } cmd.reset(); @@ -748,8 +723,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, active_nodes_mask = (1u << status.nodeList.size()) - 1; // If only one node left in the merge, return ok. if (status.nodeList.size() == 1) { - LOG(debug, "Done with merge of %s as there is only one node " - "that is not source only left in the merge.", + LOG(debug, "Done with merge of %s as there is only one node that is not source only left in the merge.", bucket.toString().c_str()); return status.reply; } @@ -777,11 +751,8 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, if (e.first == 0u) { continue; } - if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize) - || counts.size() == 1) - { - LOG(spam, "Sending separate apply bucket diff for path %x " - "with size %u", + if ((e.second >= COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE) || (counts.size() == 1)) { + LOG(spam, "Sending separate apply bucket diff for path %x with size %u", e.first, e.second); std::vector nodes; // This node always has to be first in chain. @@ -837,7 +808,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, status.pendingId = cmd->getMsgId(); LOG(debug, "Sending %s", cmd->toString().c_str()); sender.sendCommand(cmd); - return api::StorageReply::SP(); + return {}; } /** Ensures merge states are deleted if we fail operation */ @@ -1203,7 +1174,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe assert(reply.getNodes().size() >= 2); // Get bucket diff should retrieve all info at once - assert(s->diff.size() == 0); + assert(s->diff.empty()); s->diff.insert(s->diff.end(), reply.getDiff().begin(), reply.getDiff().end()); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 0f9f01b4cb1..f3bef802229 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -51,26 +51,17 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, - uint32_t maxChunkSize = 4190208, - uint32_t commonMergeChainOptimalizationMinimumSize = 64); + uint32_t maxChunkSize = 4190208); ~MergeHandler() override; - bool buildBucketInfoList( - const spi::Bucket& bucket, - Timestamp maxTimestamp, - uint8_t myNodeIndex, - std::vector& output, - spi::Context& context) const; - void fetchLocalData(const spi::Bucket& bucket, - std::vector& diff, - uint8_t nodeIndex, - spi::Context& context) const; - void applyDiffLocally(const spi::Bucket& bucket, - std::vector& diff, - uint8_t nodeIndex, - spi::Context& context, - std::shared_ptr async_results) const; + bool buildBucketInfoList(const spi::Bucket& bucket, Timestamp maxTimestamp, uint8_t myNodeIndex, + std::vector& output, spi::Context& context) const; + void fetchLocalData(const spi::Bucket& bucket, std::vector& diff, + uint8_t nodeIndex, spi::Context& context) const; + void applyDiffLocally(const spi::Bucket& bucket, std::vector& diff, + uint8_t nodeIndex, spi::Context& context, + const std::shared_ptr & async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; void schedule_delayed_delete(std::unique_ptr) const override; @@ -89,35 +80,26 @@ private: spi::PersistenceProvider &_spi; std::unique_ptr _monitored_ref_count; const uint32_t _maxChunkSize; - const uint32_t _commonMergeChainOptimalizationMinimumSize; vespalib::ISequencedTaskExecutor& _executor; MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ - api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, - MergeStatus& status, - MessageSender& sender, - spi::Context& context, - std::shared_ptr& async_results) const; + api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, + spi::Context& context, std::shared_ptr& async_results) const; /** * Invoke either put, remove or unrevertable remove on the SPI * depending on the flags in the diff entry. */ - void applyDiffEntry(std::shared_ptr async_results, - const spi::Bucket&, - const api::ApplyBucketDiffCommand::Entry&, - const document::DocumentTypeRepo& repo) const; + void applyDiffEntry(std::shared_ptr async_results, const spi::Bucket&, + const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, * sorted ascendingly on entry timestamp. * Throws std::runtime_error upon iteration failure. */ - void populateMetaData(const spi::Bucket&, - Timestamp maxTimestamp, - DocEntryList & entries, - spi::Context& context) const; + void populateMetaData(const spi::Bucket&, Timestamp maxTimestamp, DocEntryList & entries, spi::Context& context) const; std::unique_ptr deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 2f7fbb99290..87c1f83794e 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -19,9 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen : _clock(component.getClock()), _env(component, filestorHandler, metrics, provider), _processAllHandler(_env, provider), - _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, - cfg.bucketMergeChunkSize, - cfg.commonMergeChainOptimalizationMinimumSize), + _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize), _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider, component.getBucketIdFactory()) -- cgit v1.2.3