aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def5
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp14
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h12
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h10
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h2
9 files changed, 57 insertions, 9 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 036ed47ddc9..e54b503ed93 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -88,6 +88,11 @@ async_operation_throttler.window_size_backoff double default=0.95
async_operation_throttler.min_window_size int default=20
async_operation_throttler.max_window_size int default=-1 # < 0 implies INT_MAX
async_operation_throttler.resize_rate double default=3.0
+## If true, each put/remove contained within a merge is individually throttled as if it
+## were a put/remove from a client. If false, merges are throttled at a persistence thread
+## level, i.e. per ApplyBucketDiff message, regardless of how many document operations
+## are contained within.
+async_operation_throttler.throttle_individual_merge_feed_ops bool default=true
## Specify throttling used for async persistence operations. This throttling takes place
## before operations are dispatched to Proton and serves as a limiter for how many
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 66dc7126058..5c243ea4af9 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -275,6 +275,8 @@ public:
virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0;
virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0;
+
+ virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 10df36579cd..b5de5a233cc 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -58,6 +58,7 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
_getNextMessageTimeout(100ms),
_max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)),
_paused(false),
+ _throttle_apply_bucket_diff_ops(false),
_last_active_operations_stats()
{
assert(numStripes > 0);
@@ -125,7 +126,7 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api:
std::lock_guard mlock(_mergeStatesLock);
auto it = _mergeStates.find(bucket);
if (it == _mergeStates.end()) {
- if (code != 0) {
+ if (code != nullptr) {
LOG(debug, "Merge state not present at the time of clear. "
"Could not fail merge of bucket %s with code %s.",
bucket.toString().c_str(), code->toString().c_str());
@@ -135,7 +136,7 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api:
}
return;
}
- if (code != 0) {
+ if (code != nullptr) {
std::shared_ptr<MergeStatus> statusPtr(it->second);
assert(statusPtr.get());
MergeStatus& status(*statusPtr);
@@ -890,10 +891,8 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe
_active_operations_stats()
{}
-namespace {
-
bool
-operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept
+FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept
{
// Note: SetBucketState is intentionally _not_ included in this set, even though it's
// dispatched async. The rationale behind this is that SetBucketState is very cheap
@@ -911,13 +910,14 @@ operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept
case api::MessageType::CREATEBUCKET_ID:
case api::MessageType::DELETEBUCKET_ID:
return true;
+ case api::MessageType::APPLYBUCKETDIFF_ID:
+ case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
+ return _owner.throttle_apply_bucket_diff_ops();
default:
return false;
}
}
-}
-
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
{
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 698f52359f5..1bc0ab87b1c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -144,6 +144,7 @@ public:
private:
bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const;
FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard);
+ [[nodiscard]] bool operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept;
// Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
// with its locking requirements.
@@ -247,6 +248,12 @@ public:
return *_operation_throttler;
}
+ void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept override {
+ // Relaxed is fine, worst case from temporarily observing a stale value is that
+ // an ApplyBucketDiff message is (or isn't) throttled at a high level.
+ _throttle_apply_bucket_diff_ops.store(throttle_apply_bucket_diff, std::memory_order_relaxed);
+ }
+
// Implements ResumeGuard::Callback
void resume() override;
@@ -268,6 +275,7 @@ private:
mutable std::mutex _pauseMonitor;
mutable std::condition_variable _pauseCond;
std::atomic<bool> _paused;
+ std::atomic<bool> _throttle_apply_bucket_diff_ops;
std::optional<ActiveOperationsStats> _last_active_operations_stats;
// Returns the index in the targets array we are sending to, or -1 if none of them match.
@@ -287,6 +295,10 @@ private:
*/
bool isPaused() const { return _paused.load(std::memory_order_relaxed); }
+ [[nodiscard]] bool throttle_apply_bucket_diff_ops() const noexcept {
+ return _throttle_apply_bucket_diff_ops.load(std::memory_order_relaxed);
+ }
+
/**
* Return whether msg has timed out based on waitTime and the message's
* specified timeout.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index e71b818b1ea..f4aff96b53c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -228,6 +228,7 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
_use_async_message_handling_on_schedule = config->useAsyncMessageHandlingOnSchedule;
_host_info_reporter.set_noise_level(config->resourceUsageReporterNoiseLevel);
+ const bool throttle_merge_feed_ops = config->asyncOperationThrottler.throttleIndividualMergeFeedOps;
if (!liveUpdate) {
_config = std::move(config);
@@ -254,6 +255,14 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size());
_filestorHandler->operation_throttler().reconfigure_dynamic_throttling(updated_dyn_throttle_params);
}
+ // TODO remove once desired dynamic throttling behavior is set in stone
+ {
+ _filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops);
+ std::lock_guard guard(_lock);
+ for (auto& ph : _persistenceHandlers) {
+ ph->set_throttle_merge_feed_ops(throttle_merge_feed_ops);
+ }
+ }
}
void
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 4a5362f3d8d..78f53de46b3 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -36,7 +36,8 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
- _executor(executor)
+ _executor(executor),
+ _throttle_merge_feed_ops(true)
{
}
@@ -514,7 +515,8 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
spi::Context& context,
const document::DocumentTypeRepo& repo) const
{
- auto throttle_token = _operation_throttler.blocking_acquire_one();
+ auto throttle_token = throttle_merge_feed_ops() ? _operation_throttler.blocking_acquire_one()
+ : vespalib::SharedOperationThrottler::Token();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index ee6eed63eb5..93fb7efc8d0 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -81,6 +81,15 @@ public:
void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
void drain_async_writes();
+ // Thread safe, as it's set during live reconfig from the main filestor manager.
+ void set_throttle_merge_feed_ops(bool throttle) noexcept {
+ _throttle_merge_feed_ops.store(throttle, std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] bool throttle_merge_feed_ops() const noexcept {
+ return _throttle_merge_feed_ops.load(std::memory_order_relaxed);
+ }
+
private:
using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>;
const framework::Clock &_clock;
@@ -92,6 +101,7 @@ private:
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
vespalib::ISequencedTaskExecutor& _executor;
+ std::atomic<bool> _throttle_merge_feed_ops;
MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const;
/** Returns a reply if merge is complete */
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 41df1e8a075..8d71cc9308b 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -181,4 +181,10 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
}
}
+void
+PersistenceHandler::set_throttle_merge_feed_ops(bool throttle) noexcept
+{
+ _mergeHandler.set_throttle_merge_feed_ops(throttle);
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index a92c2dc78ca..c2df52e2fd6 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -35,6 +35,8 @@ public:
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; }
const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; }
+
+ void set_throttle_merge_feed_ops(bool throttle) noexcept;
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;