summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def17
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp42
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/pagedict4file.h6
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/zcposting.cpp15
-rw-r--r--searchlib/src/vespa/searchlib/diskindex/zcposting.h11
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp30
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp51
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.h22
8 files changed, 139 insertions, 55 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 99f79a92c07..f1fc667ca9d 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -80,6 +80,22 @@ resource_usage_reporter_noise_level double default=0.001
## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window
## is full (if a blocking throttler API call is invoked).
##
+async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
+## Internal throttler tuning parameters that only apply when type == DYNAMIC:
+async_operation_throttler.window_size_increment int default=20
+async_operation_throttler.window_size_decrement_factor double default=1.2
+async_operation_throttler.window_size_backoff double default=0.95
+
+## Specify throttling used for async persistence operations. This throttling takes place
+## before operations are dispatched to Proton and serves as a limiter for how many
+## operations may be in flight in Proton's internal queues.
+##
+## - UNLIMITED is, as it says on the tin, unlimited. Offers no actual throttling, but
+## has near zero overhead and never blocks.
+## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window
+## is full (if a blocking throttler API call is invoked).
+##
+## TODO deprecate in favor of the async_operation_throttler struct instead.
async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
## Specifies the extent the throttling window is increased by when the async throttle
@@ -88,4 +104,5 @@ async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED res
## value, number of threads).
##
## Only applies if async_operation_throttler_type == DYNAMIC.
+## DEPRECATED! use the async_operation_throttler struct instead
async_operation_dynamic_throttling_window_increment int default=20 restart
diff --git a/searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp b/searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp
index a5f49c2a3fe..96649d27b82 100644
--- a/searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/pagedict4file.cpp
@@ -403,6 +403,7 @@ PageDict4FileSeqWrite::open(const vespalib::string &name,
bool
PageDict4FileSeqWrite::close()
{
+ bool success = true;
_pWriter->flush();
uint64_t usedPBits = _pe.getWriteOffset();
uint64_t usedSPBits = _spe.getWriteOffset();
@@ -415,28 +416,28 @@ PageDict4FileSeqWrite::close()
_ssWriteContext.writeComprBuffer(true);
_pWriteContext.dropComprBuf();
- _pfile.Sync();
- _pfile.Close();
+ success &= _pfile.Sync();
+ success &= _pfile.Close();
_pWriteContext.setFile(nullptr);
_spWriteContext.dropComprBuf();
- _spfile.Sync();
- _spfile.Close();
+ success &= _spfile.Sync();
+ success &= _spfile.Close();
_spWriteContext.setFile(nullptr);
_ssWriteContext.dropComprBuf();
- _ssfile.Sync();
- _ssfile.Close();
+ success &= _ssfile.Sync();
+ success &= _ssfile.Close();
_ssWriteContext.setFile(nullptr);
// Update file headers
- updatePHeader(usedPBits);
- updateSPHeader(usedSPBits);
- updateSSHeader(usedSSBits);
+ success &= updatePHeader(usedPBits);
+ success &= updateSPHeader(usedSPBits);
+ success &= updateSSHeader(usedSSBits);
_pWriter.reset();
_spWriter.reset();
_ssWriter.reset();
- return true;
+ return success;
}
@@ -548,7 +549,7 @@ PageDict4FileSeqWrite::makeSSHeader(const FileHeaderContext &fileHeaderContext)
}
-void
+bool
PageDict4FileSeqWrite::updatePHeader(uint64_t fileBitSize)
{
vespalib::FileHeader h(FileSettings::DIRECTIO_ALIGNMENT);
@@ -560,12 +561,13 @@ PageDict4FileSeqWrite::updatePHeader(uint64_t fileBitSize)
h.putTag(Tag("frozen", 1));
h.putTag(Tag("fileBitSize", fileBitSize));
h.rewriteFile(f);
- f.Sync();
- f.Close();
+ bool success = f.Sync();
+ success &= f.Close();
+ return success;
}
-void
+bool
PageDict4FileSeqWrite::updateSPHeader(uint64_t fileBitSize)
{
vespalib::FileHeader h(FileSettings::DIRECTIO_ALIGNMENT);
@@ -577,12 +579,13 @@ PageDict4FileSeqWrite::updateSPHeader(uint64_t fileBitSize)
h.putTag(Tag("frozen", 1));
h.putTag(Tag("fileBitSize", fileBitSize));
h.rewriteFile(f);
- f.Sync();
- f.Close();
+ bool success = f.Sync();
+ success &= f.Close();
+ return success;
}
-void
+bool
PageDict4FileSeqWrite::updateSSHeader(uint64_t fileBitSize)
{
vespalib::FileHeader h(FileSettings::DIRECTIO_ALIGNMENT);
@@ -597,8 +600,9 @@ PageDict4FileSeqWrite::updateSSHeader(uint64_t fileBitSize)
assert(wordNum <= _sse._numWordIds);
h.putTag(Tag("numWordIds", wordNum));
h.rewriteFile(f);
- f.Sync();
- f.Close();
+ bool success = f.Sync();
+ success &= f.Close();
+ return success;
}
diff --git a/searchlib/src/vespa/searchlib/diskindex/pagedict4file.h b/searchlib/src/vespa/searchlib/diskindex/pagedict4file.h
index 0a68be7ae1b..8d2340c0b39 100644
--- a/searchlib/src/vespa/searchlib/diskindex/pagedict4file.h
+++ b/searchlib/src/vespa/searchlib/diskindex/pagedict4file.h
@@ -108,9 +108,9 @@ class PageDict4FileSeqWrite : public index::DictionaryFileSeqWrite
void makePHeader(const FileHeaderContext &fileHeaderContext);
void makeSPHeader(const FileHeaderContext &fileHeaderContext);
void makeSSHeader(const FileHeaderContext &fileHeaderContext);
- void updatePHeader(uint64_t fileBitSize);
- void updateSPHeader(uint64_t fileBitSize);
- void updateSSHeader(uint64_t fileBitSize);
+ bool updatePHeader(uint64_t fileBitSize);
+ bool updateSPHeader(uint64_t fileBitSize);
+ bool updateSSHeader(uint64_t fileBitSize);
public:
PageDict4FileSeqWrite();
~PageDict4FileSeqWrite();
diff --git a/searchlib/src/vespa/searchlib/diskindex/zcposting.cpp b/searchlib/src/vespa/searchlib/diskindex/zcposting.cpp
index f4bee9f6344..4441b868b15 100644
--- a/searchlib/src/vespa/searchlib/diskindex/zcposting.cpp
+++ b/searchlib/src/vespa/searchlib/diskindex/zcposting.cpp
@@ -258,7 +258,7 @@ Zc4PostingSeqWrite::makeHeader(const FileHeaderContext &fileHeaderContext)
}
-void
+bool
Zc4PostingSeqWrite::updateHeader()
{
vespalib::FileHeader h;
@@ -271,8 +271,9 @@ Zc4PostingSeqWrite::updateHeader()
h.putTag(Tag("fileBitSize", _fileBitSize));
h.putTag(Tag("numWords", _writer.get_num_words()));
h.rewriteFile(f);
- f.Sync();
- f.Close();
+ bool success = f.Sync();
+ success &= f.Close();
+ return success;
}
@@ -320,11 +321,11 @@ Zc4PostingSeqWrite::close()
_writer.on_close(); // flush and pad
auto &writeContext = _writer.get_write_context();
writeContext.dropComprBuf();
- _file.Sync();
- _file.Close();
+ bool success = _file.Sync();
+ success &= _file.Close();
writeContext.setFile(nullptr);
- updateHeader();
- return true;
+ success &= updateHeader();
+ return success;
}
void
diff --git a/searchlib/src/vespa/searchlib/diskindex/zcposting.h b/searchlib/src/vespa/searchlib/diskindex/zcposting.h
index 24fccee9b8d..dc23fe5b37e 100644
--- a/searchlib/src/vespa/searchlib/diskindex/zcposting.h
+++ b/searchlib/src/vespa/searchlib/diskindex/zcposting.h
@@ -61,6 +61,11 @@ protected:
FastOS_File _file;
uint64_t _fileBitSize;
index::PostingListCountFileSeqWrite *const _countFile;
+ /**
+ * Make header using feature encode write context.
+ */
+ void makeHeader(const search::common::FileHeaderContext &fileHeaderContext);
+ bool updateHeader();
public:
Zc4PostingSeqWrite(index::PostingListCountFileSeqWrite *countFile);
~Zc4PostingSeqWrite();
@@ -81,12 +86,6 @@ public:
void getParams(PostingListParams &params) override;
void setFeatureParams(const PostingListParams &params) override;
void getFeatureParams(PostingListParams &params) override;
-
- /**
- * Make header using feature encode write context.
- */
- void makeHeader(const search::common::FileHeaderContext &fileHeaderContext);
- void updateHeader();
};
class ZcPostingSeqWrite : public Zc4PostingSeqWrite
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 03b16e75297..ec7793cdd8d 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -143,17 +143,29 @@ selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) {
}
}
+vespalib::SharedOperationThrottler::DynamicThrottleParams
+dynamic_throttle_params_from_config(const StorFilestorConfig& config, size_t num_threads)
+{
+ const auto& cfg_params = config.asyncOperationThrottler;
+ auto win_size_incr = std::max(static_cast<size_t>(std::max(cfg_params.windowSizeIncrement, 1)), num_threads);
+
+ vespalib::SharedOperationThrottler::DynamicThrottleParams params;
+ params.window_size_increment = win_size_incr;
+ params.min_window_size = win_size_incr;
+ params.window_size_decrement_factor = cfg_params.windowSizeDecrementFactor;
+ params.window_size_backoff = cfg_params.windowSizeBackoff;
+ return params;
+}
+
std::unique_ptr<vespalib::SharedOperationThrottler>
make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads)
{
- const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC);
+ // TODO only use struct config field instead once config model is updated
+ const bool use_dynamic_throttling = ((config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) ||
+ (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC));
if (use_dynamic_throttling) {
- auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1);
- auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads);
- vespalib::SharedOperationThrottler::DynamicThrottleParams params;
- params.window_size_increment = win_size_increment;
- params.min_window_size = win_size_increment;
- return vespalib::SharedOperationThrottler::make_dynamic_throttler(params);
+ auto dyn_params = dynamic_throttle_params_from_config(config, num_threads);
+ return vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_params);
} else {
return vespalib::SharedOperationThrottler::make_unlimited_throttler();
}
@@ -229,6 +241,10 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
*_filestorHandler, i % numStripes, _component));
}
_bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this));
+ } else {
+ assert(_filestorHandler);
+ auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size());
+ _filestorHandler->operation_throttler().reconfigure_dynamic_throttling(updated_dyn_throttle_params);
}
}
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
index dd790bcaa0a..1df5f85bb6b 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
@@ -23,6 +23,7 @@ public:
}
uint32_t current_window_size() const noexcept override { return 0; }
uint32_t waiting_threads() const noexcept override { return 0; }
+ void reconfigure_dynamic_throttling(const DynamicThrottleParams&) noexcept override { /* no-op */ }
private:
void release_one() noexcept override { /* no-op */ }
};
@@ -38,6 +39,7 @@ private:
* messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
*/
class DynamicThrottlePolicy {
+ SharedOperationThrottler::DynamicThrottleParams _active_params;
std::function<steady_time()> _time_provider;
uint32_t _num_sent;
uint32_t _num_ok;
@@ -66,6 +68,8 @@ public:
void set_min_window_size(double min_size) noexcept;
void set_window_size_decrement_factor(double decrement_factor) noexcept;
+ void configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept;
+
[[nodiscard]] uint32_t current_window_size() const noexcept {
return static_cast<uint32_t>(_window_size);
}
@@ -74,6 +78,8 @@ public:
void process_response(bool success) noexcept;
private:
+ void internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept;
+
[[nodiscard]] uint64_t current_time_as_millis() noexcept {
return count_ms(_time_provider().time_since_epoch());
}
@@ -81,7 +87,8 @@ private:
DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params,
std::function<steady_time()> time_provider)
- : _time_provider(std::move(time_provider)),
+ : _active_params(params),
+ _time_provider(std::move(time_provider)),
_num_sent(0),
_num_ok(0),
_resize_rate(3.0),
@@ -98,14 +105,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::Dyn
_weight(1),
_local_max_throughput(0)
{
- // We use setters for convenience, since setting one parameter may imply setting others,
- // based on it, and there's frequently min/max capping of values.
- set_window_size_increment(params.window_size_increment);
- set_min_window_size(params.min_window_size);
- set_max_window_size(params.max_window_size);
- set_resize_rate(params.resize_rate);
- set_window_size_decrement_factor(params.window_size_decrement_factor);
- set_window_size_backoff(params.window_size_backoff);
+ internal_unconditional_configure(_active_params);
}
void
@@ -146,6 +146,31 @@ DynamicThrottlePolicy::set_window_size_decrement_factor(double decrement_factor)
_decrement_factor = decrement_factor;
}
+void
+DynamicThrottlePolicy::internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept
+{
+ // We use setters for convenience, since setting one parameter may imply setting others,
+ // based on it, and there's frequently min/max capping of values.
+ set_window_size_increment(params.window_size_increment);
+ set_min_window_size(params.min_window_size);
+ set_max_window_size(params.max_window_size);
+ set_resize_rate(params.resize_rate);
+ set_window_size_decrement_factor(params.window_size_decrement_factor);
+ set_window_size_backoff(params.window_size_backoff);
+}
+
+void
+DynamicThrottlePolicy::configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept
+{
+ // To avoid any noise where setting parameters on the throttler may implicitly reduce the
+ // current window size (even though this isn't _currently_ the case), don't invoke any internal
+ // reconfiguration code unless the parameters have actually changed.
+ if (params != _active_params) {
+ internal_unconditional_configure(params);
+ _active_params = params;
+ }
+}
+
bool
DynamicThrottlePolicy::has_spare_capacity(uint32_t pending_count) noexcept
{
@@ -225,6 +250,7 @@ public:
Token try_acquire_one() noexcept override;
uint32_t current_window_size() const noexcept override;
uint32_t waiting_threads() const noexcept override;
+ void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept override;
private:
void release_one() noexcept override;
// Non-const since actually checking the send window of a dynamic throttler might change
@@ -340,6 +366,13 @@ DynamicOperationThrottler::waiting_threads() const noexcept
return _waiting_threads;
}
+void
+DynamicOperationThrottler::reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept
+{
+ std::unique_lock lock(_mutex);
+ _throttle_policy.configure(params);
+}
+
} // anonymous namespace
std::unique_ptr<SharedOperationThrottler>
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
index 2a8a42dd1ba..030935339ed 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
@@ -51,9 +51,15 @@ public:
virtual ~SharedOperationThrottler() = default;
- // All methods are thread safe
+ // Acquire a valid throttling token, uninterruptedly blocking until one can be obtained.
[[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
+ // Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be
+ // available. If the timeout is exceeded without any tokens becoming available, an
+ // invalid token will be returned.
[[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ // Attempt to acquire a valid throttling token if one is immediately available.
+ // An invalid token will be returned if none is available. Never blocks (other than
+ // when contending for the internal throttler mutex).
[[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
// May return 0, in which case the window size is unlimited.
@@ -62,9 +68,6 @@ public:
// Exposed for unit testing only.
[[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
- // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
- static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
-
struct DynamicThrottleParams {
uint32_t window_size_increment = 20;
uint32_t min_window_size = 20;
@@ -72,8 +75,19 @@ public:
double resize_rate = 3.0;
double window_size_decrement_factor = 1.2;
double window_size_backoff = 0.95;
+
+ bool operator==(const DynamicThrottleParams&) const noexcept = default;
+ bool operator!=(const DynamicThrottleParams&) const noexcept = default;
};
+ // No-op if underlying throttler does not use a dynamic policy, or if the supplied
+ // parameters are equal to the current configuration.
+ // FIXME leaky abstraction alert!
+ virtual void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept = 0;
+
+ // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
+ static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
+
// Creates a throttler that uses a DynamicThrottlePolicy under the hood
static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params);
static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params,