diff options
8 files changed, 139 insertions, 55 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index 99f79a92c07..f1fc667ca9d 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -80,6 +80,22 @@ resource_usage_reporter_noise_level double default=0.001 ## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window ## is full (if a blocking throttler API call is invoked). ## +async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart +## Internal throttler tuning parameters that only apply when type == DYNAMIC: +async_operation_throttler.window_size_increment int default=20 +async_operation_throttler.window_size_decrement_factor double default=1.2 +async_operation_throttler.window_size_backoff double default=0.95 + +## Specify throttling used for async persistence operations. This throttling takes place +## before operations are dispatched to Proton and serves as a limiter for how many +## operations may be in flight in Proton's internal queues. +## +## - UNLIMITED is, as it says on the tin, unlimited. Offers no actual throttling, but +## has near zero overhead and never blocks. +## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window +## is full (if a blocking throttler API call is invoked). +## +## TODO deprecate in favor of the async_operation_throttler struct instead. async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart ## Specifies the extent the throttling window is increased by when the async throttle @@ -88,4 +104,5 @@ async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED res ## value, number of threads). ## ## Only applies if async_operation_throttler_type == DYNAMIC. +## DEPRECATED! use the async_operation_throttler struct instead async_operation_dynamic_throttling_window_increment int default=20 restart diff --git a/searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp b/searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp index a5f49c2a3fe..96649d27b82 100644 --- a/searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp @@ -403,6 +403,7 @@ PageDict4FileSeqWrite::open(const vespalib::string &name, bool PageDict4FileSeqWrite::close() { + bool success = true; _pWriter->flush(); uint64_t usedPBits = _pe.getWriteOffset(); uint64_t usedSPBits = _spe.getWriteOffset(); @@ -415,28 +416,28 @@ PageDict4FileSeqWrite::close() _ssWriteContext.writeComprBuffer(true); _pWriteContext.dropComprBuf(); - _pfile.Sync(); - _pfile.Close(); + success &= _pfile.Sync(); + success &= _pfile.Close(); _pWriteContext.setFile(nullptr); _spWriteContext.dropComprBuf(); - _spfile.Sync(); - _spfile.Close(); + success &= _spfile.Sync(); + success &= _spfile.Close(); _spWriteContext.setFile(nullptr); _ssWriteContext.dropComprBuf(); - _ssfile.Sync(); - _ssfile.Close(); + success &= _ssfile.Sync(); + success &= _ssfile.Close(); _ssWriteContext.setFile(nullptr); // Update file headers - updatePHeader(usedPBits); - updateSPHeader(usedSPBits); - updateSSHeader(usedSSBits); + success &= updatePHeader(usedPBits); + success &= updateSPHeader(usedSPBits); + success &= updateSSHeader(usedSSBits); _pWriter.reset(); _spWriter.reset(); _ssWriter.reset(); - return true; + return success; } @@ -548,7 +549,7 @@ PageDict4FileSeqWrite::makeSSHeader(const FileHeaderContext &fileHeaderContext) } -void +bool PageDict4FileSeqWrite::updatePHeader(uint64_t fileBitSize) { vespalib::FileHeader h(FileSettings::DIRECTIO_ALIGNMENT); @@ -560,12 +561,13 @@ PageDict4FileSeqWrite::updatePHeader(uint64_t fileBitSize) h.putTag(Tag("frozen", 1)); h.putTag(Tag("fileBitSize", fileBitSize)); h.rewriteFile(f); - f.Sync(); - f.Close(); + bool success = f.Sync(); + success &= f.Close(); + return success; } -void +bool PageDict4FileSeqWrite::updateSPHeader(uint64_t fileBitSize) { vespalib::FileHeader h(FileSettings::DIRECTIO_ALIGNMENT); @@ -577,12 +579,13 @@ PageDict4FileSeqWrite::updateSPHeader(uint64_t fileBitSize) h.putTag(Tag("frozen", 1)); h.putTag(Tag("fileBitSize", fileBitSize)); h.rewriteFile(f); - f.Sync(); - f.Close(); + bool success = f.Sync(); + success &= f.Close(); + return success; } -void +bool PageDict4FileSeqWrite::updateSSHeader(uint64_t fileBitSize) { vespalib::FileHeader h(FileSettings::DIRECTIO_ALIGNMENT); @@ -597,8 +600,9 @@ PageDict4FileSeqWrite::updateSSHeader(uint64_t fileBitSize) assert(wordNum <= _sse._numWordIds); h.putTag(Tag("numWordIds", wordNum)); h.rewriteFile(f); - f.Sync(); - f.Close(); + bool success = f.Sync(); + success &= f.Close(); + return success; } diff --git a/searchlib/src/vespa/searchlib/diskindex/pagedict4file.h b/searchlib/src/vespa/searchlib/diskindex/pagedict4file.h index 0a68be7ae1b..8d2340c0b39 100644 --- a/searchlib/src/vespa/searchlib/diskindex/pagedict4file.h +++ b/searchlib/src/vespa/searchlib/diskindex/pagedict4file.h @@ -108,9 +108,9 @@ class PageDict4FileSeqWrite : public index::DictionaryFileSeqWrite void makePHeader(const FileHeaderContext &fileHeaderContext); void makeSPHeader(const FileHeaderContext &fileHeaderContext); void makeSSHeader(const FileHeaderContext &fileHeaderContext); - void updatePHeader(uint64_t fileBitSize); - void updateSPHeader(uint64_t fileBitSize); - void updateSSHeader(uint64_t fileBitSize); + bool updatePHeader(uint64_t fileBitSize); + bool updateSPHeader(uint64_t fileBitSize); + bool updateSSHeader(uint64_t fileBitSize); public: PageDict4FileSeqWrite(); ~PageDict4FileSeqWrite(); diff --git a/searchlib/src/vespa/searchlib/diskindex/zcposting.cpp b/searchlib/src/vespa/searchlib/diskindex/zcposting.cpp index f4bee9f6344..4441b868b15 100644 --- a/searchlib/src/vespa/searchlib/diskindex/zcposting.cpp +++ b/searchlib/src/vespa/searchlib/diskindex/zcposting.cpp @@ -258,7 +258,7 @@ Zc4PostingSeqWrite::makeHeader(const FileHeaderContext &fileHeaderContext) } -void +bool Zc4PostingSeqWrite::updateHeader() { vespalib::FileHeader h; @@ -271,8 +271,9 @@ Zc4PostingSeqWrite::updateHeader() h.putTag(Tag("fileBitSize", _fileBitSize)); h.putTag(Tag("numWords", _writer.get_num_words())); h.rewriteFile(f); - f.Sync(); - f.Close(); + bool success = f.Sync(); + success &= f.Close(); + return success; } @@ -320,11 +321,11 @@ Zc4PostingSeqWrite::close() _writer.on_close(); // flush and pad auto &writeContext = _writer.get_write_context(); writeContext.dropComprBuf(); - _file.Sync(); - _file.Close(); + bool success = _file.Sync(); + success &= _file.Close(); writeContext.setFile(nullptr); - updateHeader(); - return true; + success &= updateHeader(); + return success; } void diff --git a/searchlib/src/vespa/searchlib/diskindex/zcposting.h b/searchlib/src/vespa/searchlib/diskindex/zcposting.h index 24fccee9b8d..dc23fe5b37e 100644 --- a/searchlib/src/vespa/searchlib/diskindex/zcposting.h +++ b/searchlib/src/vespa/searchlib/diskindex/zcposting.h @@ -61,6 +61,11 @@ protected: FastOS_File _file; uint64_t _fileBitSize; index::PostingListCountFileSeqWrite *const _countFile; + /** + * Make header using feature encode write context. + */ + void makeHeader(const search::common::FileHeaderContext &fileHeaderContext); + bool updateHeader(); public: Zc4PostingSeqWrite(index::PostingListCountFileSeqWrite *countFile); ~Zc4PostingSeqWrite(); @@ -81,12 +86,6 @@ public: void getParams(PostingListParams ¶ms) override; void setFeatureParams(const PostingListParams ¶ms) override; void getFeatureParams(PostingListParams ¶ms) override; - - /** - * Make header using feature encode write context. - */ - void makeHeader(const search::common::FileHeaderContext &fileHeaderContext); - void updateHeader(); }; class ZcPostingSeqWrite : public Zc4PostingSeqWrite diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 03b16e75297..ec7793cdd8d 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -143,17 +143,29 @@ selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) { } } +vespalib::SharedOperationThrottler::DynamicThrottleParams +dynamic_throttle_params_from_config(const StorFilestorConfig& config, size_t num_threads) +{ + const auto& cfg_params = config.asyncOperationThrottler; + auto win_size_incr = std::max(static_cast<size_t>(std::max(cfg_params.windowSizeIncrement, 1)), num_threads); + + vespalib::SharedOperationThrottler::DynamicThrottleParams params; + params.window_size_increment = win_size_incr; + params.min_window_size = win_size_incr; + params.window_size_decrement_factor = cfg_params.windowSizeDecrementFactor; + params.window_size_backoff = cfg_params.windowSizeBackoff; + return params; +} + std::unique_ptr<vespalib::SharedOperationThrottler> make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads) { - const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC); + // TODO only use struct config field instead once config model is updated + const bool use_dynamic_throttling = ((config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) || + (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC)); if (use_dynamic_throttling) { - auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1); - auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads); - vespalib::SharedOperationThrottler::DynamicThrottleParams params; - params.window_size_increment = win_size_increment; - params.min_window_size = win_size_increment; - return vespalib::SharedOperationThrottler::make_dynamic_throttler(params); + auto dyn_params = dynamic_throttle_params_from_config(config, num_threads); + return vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_params); } else { return vespalib::SharedOperationThrottler::make_unlimited_throttler(); } @@ -229,6 +241,10 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) *_filestorHandler, i % numStripes, _component)); } _bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this)); + } else { + assert(_filestorHandler); + auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size()); + _filestorHandler->operation_throttler().reconfigure_dynamic_throttling(updated_dyn_throttle_params); } } diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp index dd790bcaa0a..1df5f85bb6b 100644 --- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp @@ -23,6 +23,7 @@ public: } uint32_t current_window_size() const noexcept override { return 0; } uint32_t waiting_threads() const noexcept override { return 0; } + void reconfigure_dynamic_throttling(const DynamicThrottleParams&) noexcept override { /* no-op */ } private: void release_one() noexcept override { /* no-op */ } }; @@ -38,6 +39,7 @@ private: * messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java */ class DynamicThrottlePolicy { + SharedOperationThrottler::DynamicThrottleParams _active_params; std::function<steady_time()> _time_provider; uint32_t _num_sent; uint32_t _num_ok; @@ -66,6 +68,8 @@ public: void set_min_window_size(double min_size) noexcept; void set_window_size_decrement_factor(double decrement_factor) noexcept; + void configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept; + [[nodiscard]] uint32_t current_window_size() const noexcept { return static_cast<uint32_t>(_window_size); } @@ -74,6 +78,8 @@ public: void process_response(bool success) noexcept; private: + void internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept; + [[nodiscard]] uint64_t current_time_as_millis() noexcept { return count_ms(_time_provider().time_since_epoch()); } @@ -81,7 +87,8 @@ private: DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params, std::function<steady_time()> time_provider) - : _time_provider(std::move(time_provider)), + : _active_params(params), + _time_provider(std::move(time_provider)), _num_sent(0), _num_ok(0), _resize_rate(3.0), @@ -98,14 +105,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::Dyn _weight(1), _local_max_throughput(0) { - // We use setters for convenience, since setting one parameter may imply setting others, - // based on it, and there's frequently min/max capping of values. - set_window_size_increment(params.window_size_increment); - set_min_window_size(params.min_window_size); - set_max_window_size(params.max_window_size); - set_resize_rate(params.resize_rate); - set_window_size_decrement_factor(params.window_size_decrement_factor); - set_window_size_backoff(params.window_size_backoff); + internal_unconditional_configure(_active_params); } void @@ -146,6 +146,31 @@ DynamicThrottlePolicy::set_window_size_decrement_factor(double decrement_factor) _decrement_factor = decrement_factor; } +void +DynamicThrottlePolicy::internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept +{ + // We use setters for convenience, since setting one parameter may imply setting others, + // based on it, and there's frequently min/max capping of values. + set_window_size_increment(params.window_size_increment); + set_min_window_size(params.min_window_size); + set_max_window_size(params.max_window_size); + set_resize_rate(params.resize_rate); + set_window_size_decrement_factor(params.window_size_decrement_factor); + set_window_size_backoff(params.window_size_backoff); +} + +void +DynamicThrottlePolicy::configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept +{ + // To avoid any noise where setting parameters on the throttler may implicitly reduce the + // current window size (even though this isn't _currently_ the case), don't invoke any internal + // reconfiguration code unless the parameters have actually changed. + if (params != _active_params) { + internal_unconditional_configure(params); + _active_params = params; + } +} + bool DynamicThrottlePolicy::has_spare_capacity(uint32_t pending_count) noexcept { @@ -225,6 +250,7 @@ public: Token try_acquire_one() noexcept override; uint32_t current_window_size() const noexcept override; uint32_t waiting_threads() const noexcept override; + void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept override; private: void release_one() noexcept override; // Non-const since actually checking the send window of a dynamic throttler might change @@ -340,6 +366,13 @@ DynamicOperationThrottler::waiting_threads() const noexcept return _waiting_threads; } +void +DynamicOperationThrottler::reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept +{ + std::unique_lock lock(_mutex); + _throttle_policy.configure(params); +} + } // anonymous namespace std::unique_ptr<SharedOperationThrottler> diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h index 2a8a42dd1ba..030935339ed 100644 --- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h @@ -51,9 +51,15 @@ public: virtual ~SharedOperationThrottler() = default; - // All methods are thread safe + // Acquire a valid throttling token, uninterruptedly blocking until one can be obtained. [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0; + // Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be + // available. If the timeout is exceeded without any tokens becoming available, an + // invalid token will be returned. [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0; + // Attempt to acquire a valid throttling token if one is immediately available. + // An invalid token will be returned if none is available. Never blocks (other than + // when contending for the internal throttler mutex). [[nodiscard]] virtual Token try_acquire_one() noexcept = 0; // May return 0, in which case the window size is unlimited. @@ -62,9 +68,6 @@ public: // Exposed for unit testing only. [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; - // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking) - static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); - struct DynamicThrottleParams { uint32_t window_size_increment = 20; uint32_t min_window_size = 20; @@ -72,8 +75,19 @@ public: double resize_rate = 3.0; double window_size_decrement_factor = 1.2; double window_size_backoff = 0.95; + + bool operator==(const DynamicThrottleParams&) const noexcept = default; + bool operator!=(const DynamicThrottleParams&) const noexcept = default; }; + // No-op if underlying throttler does not use a dynamic policy, or if the supplied + // parameters are equal to the current configuration. + // FIXME leaky abstraction alert! + virtual void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept = 0; + + // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking) + static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); + // Creates a throttler that uses a DynamicThrottlePolicy under the hood static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params); static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params, |