diff options
7 files changed, 10 insertions, 45 deletions
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java index 7c3e66aa109..46590d18b40 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java @@ -337,7 +337,6 @@ public class StorageClusterTest { assertEquals(20, config.async_operation_throttler().min_window_size()); assertEquals(-1, config.async_operation_throttler().max_window_size()); // <=0 implies +inf assertEquals(3.0, config.async_operation_throttler().resize_rate(), 0.0001); - assertTrue(config.async_operation_throttler().throttle_individual_merge_feed_ops()); } @Test diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index cefce5fc648..93650d9a328 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -27,6 +27,7 @@ response_sequencer_type enum {LATENCY, THROUGHPUT, ADAPTIVE} default=ADAPTIVE re ## When merging, if we find more than this number of documents that exist on all ## of the same copies, send a separate apply bucket diff with these entries ## to an optimized merge chain that guarantuees minimum data transfer. +## TODO GC and hardcode common_merge_chain_optimalization_minimum_size int default=64 restart ## Chunksize to use while merging buckets between nodes. @@ -37,6 +38,7 @@ bucket_merge_chunk_size int default=16772216 restart ## Whether or not to enable the multibit split optimalization. This is useful ## if splitting is expensive, but listing document identifiers is fairly cheap. ## This is true for memfile persistence layer, but not for vespa search. +## TODO verify its usage enable_multibit_split_optimalization bool default=true restart ## Whether or not to use async message handling when scheduling storage messages from FileStorManager. @@ -71,8 +73,3 @@ async_operation_throttler.window_size_backoff double default=0.95 async_operation_throttler.min_window_size int default=20 async_operation_throttler.max_window_size int default=-1 # < 0 implies INT_MAX async_operation_throttler.resize_rate double default=3.0 -## If true, each put/remove contained within a merge is individually throttled as if it -## were a put/remove from a client. If false, merges are throttled at a persistence thread -## level, i.e. per ApplyBucketDiff message, regardless of how many document operations -## are contained within. -async_operation_throttler.throttle_individual_merge_feed_ops bool default=true diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 90703050009..093c11fb913 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -25,7 +25,6 @@ #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/string_escape.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/config/subscription/configuri.h> #include <vespa/config/helper/configfetcher.hpp> #include <thread> @@ -49,7 +48,7 @@ namespace { class BucketExecutorWrapper : public spi::BucketExecutor { public: - BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { } + explicit BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { } void execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override { _executor.execute(bucket, std::move(task)); @@ -213,7 +212,6 @@ FileStorManager::on_configure(const StorFilestorConfig& config) _use_async_message_handling_on_schedule = config.useAsyncMessageHandlingOnSchedule; _host_info_reporter.set_noise_level(config.resourceUsageReporterNoiseLevel); const bool use_dynamic_throttling = (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC); - const bool throttle_merge_feed_ops = config.asyncOperationThrottler.throttleIndividualMergeFeedOps; if (!liveUpdate) { _config = std::make_unique<StorFilestorConfig>(config); @@ -243,11 +241,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config) // TODO remove once desired throttling behavior is set in stone { _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling); - _filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops); - std::lock_guard guard(_lock); - for (auto& ph : _persistenceHandlers) { - ph->set_throttle_merge_feed_ops(throttle_merge_feed_ops); - } + _filestorHandler->set_throttle_apply_bucket_diff_ops(false); } } @@ -312,7 +306,7 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const docu if (results.size() > 1) { error << "Bucket was inconsistent with " << results.size() << " entries so no automatic remapping done:"; - BucketMap::const_iterator it = results.begin(); + auto it = results.begin(); for (uint32_t i=0; i <= 4 && it != results.end(); ++it, ++i) { error << " " << it->first; } @@ -551,10 +545,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) StorBucketDatabase::WrappedEntry -FileStorManager::ensureConsistentBucket( - const document::Bucket& bucket, - api::StorageMessage& msg, - const char* callerId) +FileStorManager::ensureConsistentBucket(const document::Bucket& bucket, api::StorageMessage& msg, const char* callerId) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), callerId, StorBucketDatabase::CREATE_IF_NONEXISTING)); @@ -565,7 +556,7 @@ FileStorManager::ensureConsistentBucket( entry.remove(); } replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket is inconsistently split"); - return StorBucketDatabase::WrappedEntry(); + return {}; } return entry; @@ -899,7 +890,7 @@ FileStorManager::maintenance_in_all_spaces(const lib::Node& node) const noexcept if (!derived_cluster_state->getNodeState(node).getState().oneOf("m")) { return false; } - }; + } return true; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 1b7041583e8..b1e36147e30 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -2,7 +2,6 @@ #include "mergehandler.h" #include "persistenceutil.h" -#include "shared_operation_throttler.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> @@ -37,8 +36,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _executor(executor), - _throttle_merge_feed_ops(true) + _executor(executor) { } @@ -511,8 +509,7 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results const api::ApplyBucketDiffCommand::Entry& e, const document::DocumentTypeRepo& repo) const { - auto throttle_token = throttle_merge_feed_ops() ? _env._fileStorHandler.operation_throttler().blocking_acquire_one() - : vespalib::SharedOperationThrottler::Token(); + auto throttle_token = _env._fileStorHandler.operation_throttler().blocking_acquire_one(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 43b51662fe6..0f9f01b4cb1 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -20,7 +20,6 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/util/monitored_refcount.h> #include <vespa/storageframework/generic/clock/time.h> -#include <atomic> namespace vespalib { class ISequencedTaskExecutor; } namespace document { class Document; } @@ -82,15 +81,6 @@ public: void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; void drain_async_writes(); - // Thread safe, as it's set during live reconfig from the main filestor manager. - void set_throttle_merge_feed_ops(bool throttle) noexcept { - _throttle_merge_feed_ops.store(throttle, std::memory_order_relaxed); - } - - [[nodiscard]] bool throttle_merge_feed_ops() const noexcept { - return _throttle_merge_feed_ops.load(std::memory_order_relaxed); - } - private: using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>; const framework::Clock &_clock; @@ -101,7 +91,6 @@ private: const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; vespalib::ISequencedTaskExecutor& _executor; - std::atomic<bool> _throttle_merge_feed_ops; MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 29d39845f5a..2f7fbb99290 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -175,10 +175,4 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } -void -PersistenceHandler::set_throttle_merge_feed_ops(bool throttle) noexcept -{ - _mergeHandler.set_throttle_merge_feed_ops(throttle); -} - } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index 595815d2bb3..1835b56528e 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -36,8 +36,6 @@ public: const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } - - void set_throttle_merge_feed_ops(bool throttle) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; |