diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2024-02-05 11:21:23 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-05 11:21:23 +0100 |
commit | b2fc8e4051afa7c5398ba198f34d6141e6a07fed (patch) | |
tree | 07f861765d08ca4c2028cce860c08fa01c8e054a /storage | |
parent | 98aece8d30708d24285c6e8bf2473061cdc36b36 (diff) | |
parent | 1e27c802c89362fd00009f2359730508a840e4af (diff) |
Merge pull request #30161 from vespa-engine/balder/throttle_individual_merge_feed_ops_and_common_merge_chain_optimalization
Balder/throttle individual merge feed ops and common merge chain optimalization
Diffstat (limited to 'storage')
6 files changed, 58 insertions, 138 deletions
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index f79235ae505..4bd0570efa8 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -170,11 +170,11 @@ struct MergeHandlerTest : PersistenceTestUtils { MergeHandler createHandler(size_t maxChunkSize = 0x400000) { return MergeHandler(getEnv(), getPersistenceProvider(), - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize, 64); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, maxChunkSize); } MergeHandler createHandler(spi::PersistenceProvider & spi) { return MergeHandler(getEnv(), spi, - getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208, 64); + getEnv()._component.cluster_context(), getEnv()._component.getClock(), *_sequenceTaskExecutor, 4190208); } std::shared_ptr<api::StorageMessage> get_queued_reply() { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 90703050009..093c11fb913 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -25,7 +25,6 @@ #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/string_escape.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/config/subscription/configuri.h> #include <vespa/config/helper/configfetcher.hpp> #include <thread> @@ -49,7 +48,7 @@ namespace { class BucketExecutorWrapper : public spi::BucketExecutor { public: - BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { } + explicit BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { } void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override { _executor.execute(bucket, std::move(task)); @@ -213,7 +212,6 @@ FileStorManager::on_configure(const StorFilestorConfig& config) _use_async_message_handling_on_schedule = config.useAsyncMessageHandlingOnSchedule; _host_info_reporter.set_noise_level(config.resourceUsageReporterNoiseLevel); const bool use_dynamic_throttling = (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC); - const bool throttle_merge_feed_ops = config.asyncOperationThrottler.throttleIndividualMergeFeedOps; if (!liveUpdate) { _config = std::make_unique<StorFilestorConfig>(config); @@ -243,11 +241,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config) // TODO remove once desired throttling behavior is set in stone { _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling); - _filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops); - std::lock_guard guard(_lock); - for (auto& ph : _persistenceHandlers) { - ph->set_throttle_merge_feed_ops(throttle_merge_feed_ops); - } + _filestorHandler->set_throttle_apply_bucket_diff_ops(false); } } @@ -312,7 +306,7 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const docu if (results.size() > 1) { error << "Bucket was inconsistent with " << results.size() << " entries so no automatic remapping done:"; - BucketMap::const_iterator it = results.begin(); + auto it = results.begin(); for (uint32_t i=0; i <= 4 && it != results.end(); ++it, ++i) { error << " " << it->first; } @@ -551,10 +545,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) StorBucketDatabase::WrappedEntry -FileStorManager::ensureConsistentBucket( - const document::Bucket& bucket, - api::StorageMessage& msg, - const char* callerId) +FileStorManager::ensureConsistentBucket(const document::Bucket& bucket, api::StorageMessage& msg, const char* callerId) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), callerId, StorBucketDatabase::CREATE_IF_NONEXISTING)); @@ -565,7 +556,7 @@ FileStorManager::ensureConsistentBucket( entry.remove(); } replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket is inconsistently split"); - return StorBucketDatabase::WrappedEntry(); + return {}; } return entry; @@ -899,7 +890,7 @@ FileStorManager::maintenance_in_all_spaces(const lib::Node& node) const noexcept if (!derived_cluster_state->getNodeState(node).getState().oneOf("m")) { return false; } - }; + } return true; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 1b7041583e8..7ee2d9f37bf 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -2,7 +2,6 @@ #include "mergehandler.h" #include "persistenceutil.h" -#include "shared_operation_throttler.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> @@ -28,17 +27,14 @@ namespace storage { MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, - uint32_t maxChunkSize, - uint32_t commonMergeChainOptimalizationMinimumSize) + uint32_t maxChunkSize) : _clock(clock), _cluster_context(cluster_context), _env(env), _spi(spi), _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), - _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _executor(executor), - _throttle_merge_feed_ops(true) + _executor(executor) { } @@ -50,6 +46,8 @@ MergeHandler::~MergeHandler() namespace { +constexpr uint32_t COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE = 64u; + constexpr int getDeleteFlag() { // Referred into old slotfile code before. Where should this number come from? return 2; @@ -177,7 +175,7 @@ MergeHandler::buildBucketInfoList( std::vector<api::GetBucketDiffCommand::Entry>& output, spi::Context& context) const { - assert(output.size() == 0); + assert(output.empty()); assert(myNodeIndex < 16); uint32_t oldSize = output.size(); using DbBucketInfo = api::BucketInfo; @@ -489,13 +487,12 @@ MergeHandler::fetchLocalData( } document::Document::UP -MergeHandler::deserializeDiffDocument( - const api::ApplyBucketDiffCommand::Entry& e, - const document::DocumentTypeRepo& repo) const +MergeHandler::deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, + const document::DocumentTypeRepo& repo) const { auto doc = std::make_unique<document::Document>(); vespalib::nbostream hbuf(&e._headerBlob[0], e._headerBlob.size()); - if (e._bodyBlob.size() > 0) { + if (!e._bodyBlob.empty()) { // TODO Remove this branch and add warning on error. vespalib::nbostream bbuf(&e._bodyBlob[0], e._bodyBlob.size()); doc->deserialize(repo, hbuf, bbuf); @@ -511,8 +508,7 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const { - auto throttle_token = throttle_merge_feed_ops() ? _env._fileStorHandler.operation_throttler().blocking_acquire_one() - : vespalib::SharedOperationThrottler::Token(); + auto throttle_token = _env._fileStorHandler.operation_throttler().blocking_acquire_one(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry @@ -536,17 +532,13 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results * Apply the diffs needed locally. */ void -MergeHandler::applyDiffLocally( - const spi::Bucket& bucket, - std::vector<api::ApplyBucketDiffCommand::Entry>& diff, - uint8_t nodeIndex, - spi::Context& context, - std::shared_ptr<ApplyBucketDiffState> async_results) const +MergeHandler::applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + uint8_t nodeIndex, spi::Context& context, + const std::shared_ptr<ApplyBucketDiffState> & async_results) const { // Sort the data to apply by which file they should be added to LOG(spam, "Merge(%s): Applying data locally. Diff has %zu entries", - bucket.toString().c_str(), - diff.size()); + bucket.toString().c_str(), diff.size()); uint32_t nodeMask = 1 << nodeIndex; uint32_t byteCount = 0; uint32_t addedCount = 0; @@ -566,9 +558,8 @@ MergeHandler::applyDiffLocally( if (spi::Timestamp(e._entry._timestamp) > existing.getTimestamp()) { ++j; - LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and " - "already present in persistence", bucket.toString().c_str(), - existing.toString().c_str()); + LOG(spam, "ApplyBucketDiff(%s): slot %s not in diff and already present in persistence", + bucket.toString().c_str(), existing.toString().c_str()); continue; } if ((e._entry._hasMask & nodeMask) != 0) { @@ -579,8 +570,7 @@ MergeHandler::applyDiffLocally( } if (!e.filled()) { ++i; - LOG(debug, "Failed to apply unretrieved entry %s to diff " - "locally on %s. Entry was probably compacted away.", + LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.", e.toString().c_str(), bucket.toString().c_str()); continue; } @@ -599,19 +589,14 @@ MergeHandler::applyDiffLocally( ++i; ++j; if ((e._entry._flags & DELETED) && !existing.isRemove()) { - LOG(debug, "Slot in diff is remove for existing " - "timestamp in %s. Diff slot: %s. Existing slot: %s", - bucket.toString().c_str(), e.toString().c_str(), - existing.toString().c_str()); + LOG(debug, "Slot in diff is remove for existing timestamp in %s. Diff slot: %s. Existing slot: %s", + bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); applyDiffEntry(async_results, bucket, e, repo); } else { // Duplicate put, just ignore it. - LOG(debug, "During diff apply, attempting to add slot " - "whose timestamp already exists in %s, but assuming " - "these are for the same entry--ignoring it. " - "Diff slot: %s. Existing slot: %s", - bucket.toString().c_str(), e.toString().c_str(), - existing.toString().c_str()); + LOG(debug, "During diff apply, attempting to add slot whose timestamp already exists in %s, " + " but assuming these are for the same entry--ignoring it. Diff slot: %s. Existing slot: %s", + bucket.toString().c_str(), e.toString().c_str(), existing.toString().c_str()); } continue; } @@ -626,8 +611,7 @@ MergeHandler::applyDiffLocally( continue; } if (!e.filled()) { - LOG(debug, "Failed to apply unretrieved entry %s to diff " - "locally on %s. Entry was probably compacted away.", + LOG(debug, "Failed to apply unretrieved entry %s to diff locally on %s. Entry was probably compacted away.", e.toString().c_str(), bucket.toString().c_str()); continue; } @@ -653,19 +637,13 @@ MergeHandler::sync_bucket_info(const spi::Bucket& bucket) const spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket)); if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) { LOG(warning, "Failed to get bucket info for %s: %s", - bucket.toString().c_str(), - infoResult.getErrorMessage().c_str()); - throw std::runtime_error("Failed to invoke getBucketInfo on " - "persistence provider"); + bucket.toString().c_str(), infoResult.getErrorMessage().c_str()); + throw std::runtime_error("Failed to invoke getBucketInfo on persistence provider"); } const spi::BucketInfo& tmpInfo(infoResult.getBucketInfo()); - api::BucketInfo providerInfo(tmpInfo.getChecksum(), - tmpInfo.getDocumentCount(), - tmpInfo.getDocumentSize(), - tmpInfo.getEntryCount(), - tmpInfo.getUsedSize(), - tmpInfo.isReady(), - tmpInfo.isActive()); + api::BucketInfo providerInfo(tmpInfo.getChecksum(), tmpInfo.getDocumentCount(), tmpInfo.getDocumentSize(), + tmpInfo.getEntryCount(), tmpInfo.getUsedSize(), + tmpInfo.isReady(), tmpInfo.isActive()); _env.updateBucketDatabase(bucket.getBucket(), providerInfo); } @@ -701,9 +679,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, // If last action failed, fail the whole merge if (status.reply->getResult().failed()) { LOG(warning, "Done with merge of %s (failed: %s) %s", - bucket.toString().c_str(), - status.reply->getResult().toString().c_str(), - status.toString().c_str()); + bucket.toString().c_str(), status.reply->getResult().toString().c_str(), status.toString().c_str()); return status.reply; } @@ -735,13 +711,9 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, cmd = std::make_shared<api::ApplyBucketDiffCommand>(bucket.getBucket(), nodes); cmd->setAddress(createAddress(_cluster_context.cluster_name_ptr(), nodes[1].index)); - findCandidates(status, - active_nodes_mask, - true, - 1 << (status.nodeList.size() - 1), - 1 << (nodes.size() - 1), - *cmd); - if (cmd->getDiff().size() != 0) { + findCandidates(status, active_nodes_mask, true, 1 << (status.nodeList.size() - 1), + 1 << (nodes.size() - 1), *cmd); + if (!cmd->getDiff().empty()) { break; } cmd.reset(); @@ -751,8 +723,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, active_nodes_mask = (1u << status.nodeList.size()) - 1; // If only one node left in the merge, return ok. if (status.nodeList.size() == 1) { - LOG(debug, "Done with merge of %s as there is only one node " - "that is not source only left in the merge.", + LOG(debug, "Done with merge of %s as there is only one node that is not source only left in the merge.", bucket.toString().c_str()); return status.reply; } @@ -780,11 +751,8 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, if (e.first == 0u) { continue; } - if (e.second >= uint32_t(_commonMergeChainOptimalizationMinimumSize) - || counts.size() == 1) - { - LOG(spam, "Sending separate apply bucket diff for path %x " - "with size %u", + if ((e.second >= COMMON_MERGE_CHAIN_OPTIMIZATION_SIZE) || (counts.size() == 1)) { + LOG(spam, "Sending separate apply bucket diff for path %x with size %u", e.first, e.second); std::vector<api::MergeBucketCommand::Node> nodes; // This node always has to be first in chain. @@ -840,7 +808,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, status.pendingId = cmd->getMsgId(); LOG(debug, "Sending %s", cmd->toString().c_str()); sender.sendCommand(cmd); - return api::StorageReply::SP(); + return {}; } /** Ensures merge states are deleted if we fail operation */ @@ -1206,7 +1174,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe assert(reply.getNodes().size() >= 2); // Get bucket diff should retrieve all info at once - assert(s->diff.size() == 0); + assert(s->diff.empty()); s->diff.insert(s->diff.end(), reply.getDiff().begin(), reply.getDiff().end()); diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 43b51662fe6..f3bef802229 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -20,7 +20,6 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/storageframework/generic/clock/time.h> -#include <atomic> namespace vespalib { class ISequencedTaskExecutor; } namespace document { class Document; } @@ -52,26 +51,17 @@ public: MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, const ClusterContext& cluster_context, const framework::Clock & clock, vespalib::ISequencedTaskExecutor& executor, - uint32_t maxChunkSize = 4190208, - uint32_t commonMergeChainOptimalizationMinimumSize = 64); + uint32_t maxChunkSize = 4190208); ~MergeHandler() override; - bool buildBucketInfoList( - const spi::Bucket& bucket, - Timestamp maxTimestamp, - uint8_t myNodeIndex, - std::vector<api::GetBucketDiffCommand::Entry>& output, - spi::Context& context) const; - void fetchLocalData(const spi::Bucket& bucket, - std::vector<api::ApplyBucketDiffCommand::Entry>& diff, - uint8_t nodeIndex, - spi::Context& context) const; - void applyDiffLocally(const spi::Bucket& bucket, - std::vector<api::ApplyBucketDiffCommand::Entry>& diff, - uint8_t nodeIndex, - spi::Context& context, - std::shared_ptr<ApplyBucketDiffState> async_results) const; + bool buildBucketInfoList(const spi::Bucket& bucket, Timestamp maxTimestamp, uint8_t myNodeIndex, + std::vector<api::GetBucketDiffCommand::Entry>& output, spi::Context& context) const; + void fetchLocalData(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + uint8_t nodeIndex, spi::Context& context) const; + void applyDiffLocally(const spi::Bucket& bucket, std::vector<api::ApplyBucketDiffCommand::Entry>& diff, + uint8_t nodeIndex, spi::Context& context, + const std::shared_ptr<ApplyBucketDiffState> & async_results) const; void sync_bucket_info(const spi::Bucket& bucket) const override; void schedule_delayed_delete(std::unique_ptr<ApplyBucketDiffState>) const override; @@ -82,15 +72,6 @@ public: void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; void drain_async_writes(); - // Thread safe, as it's set during live reconfig from the main filestor manager. - void set_throttle_merge_feed_ops(bool throttle) noexcept { - _throttle_merge_feed_ops.store(throttle, std::memory_order_relaxed); - } - - [[nodiscard]] bool throttle_merge_feed_ops() const noexcept { - return _throttle_merge_feed_ops.load(std::memory_order_relaxed); - } - private: using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>; const framework::Clock &_clock; @@ -99,36 +80,26 @@ private: spi::PersistenceProvider &_spi; std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; - const uint32_t _commonMergeChainOptimalizationMinimumSize; vespalib::ISequencedTaskExecutor& _executor; - std::atomic<bool> _throttle_merge_feed_ops; MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ - api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, - MergeStatus& status, - MessageSender& sender, - spi::Context& context, - std::shared_ptr<ApplyBucketDiffState>& async_results) const; + api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, MessageSender& sender, + spi::Context& context, std::shared_ptr<ApplyBucketDiffState>& async_results) const; /** * Invoke either put, remove or unrevertable remove on the SPI * depending on the flags in the diff entry. */ - void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, - const spi::Bucket&, - const api::ApplyBucketDiffCommand::Entry&, - const document::DocumentTypeRepo& repo) const; + void applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results, const spi::Bucket&, + const api::ApplyBucketDiffCommand::Entry&, const document::DocumentTypeRepo& repo) const; /** * Fill entries-vector with metadata for bucket up to maxTimestamp, * sorted ascendingly on entry timestamp. * Throws std::runtime_error upon iteration failure. */ - void populateMetaData(const spi::Bucket&, - Timestamp maxTimestamp, - DocEntryList & entries, - spi::Context& context) const; + void populateMetaData(const spi::Bucket&, Timestamp maxTimestamp, DocEntryList & entries, spi::Context& context) const; std::unique_ptr<document::Document> deserializeDiffDocument(const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 29d39845f5a..87c1f83794e 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -19,9 +19,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen : _clock(component.getClock()), _env(component, filestorHandler, metrics, provider), _processAllHandler(_env, provider), - _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, - cfg.bucketMergeChunkSize, - cfg.commonMergeChainOptimalizationMinimumSize), + _mergeHandler(_env, provider, component.cluster_context(), _clock, sequencedExecutor, cfg.bucketMergeChunkSize), _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()), _splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization), _simpleHandler(_env, provider, component.getBucketIdFactory()) @@ -175,10 +173,4 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } -void -PersistenceHandler::set_throttle_merge_feed_ops(bool throttle) noexcept -{ - _mergeHandler.set_throttle_merge_feed_ops(throttle); -} - } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index 595815d2bb3..1835b56528e 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -36,8 +36,6 @@ public: const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } - - void set_throttle_merge_feed_ops(bool throttle) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; |