diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2022-05-23 11:43:08 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2022-05-23 11:43:08 +0000 |
commit | ba55f9c8787901143806f88d33db8bc393f6ff2e (patch) | |
tree | 67e836e539efe3fdd4bff2b29162acd80419c30f | |
parent | e657c0a9618868c9dcf32cfa7e05ac73750b904c (diff) |
use cached queue size for metrics to avoid lock inversion
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp | 13 | ||||
-rw-r--r-- | storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h | 16 |
2 files changed, 24 insertions, 5 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index cfcc32a76e5..a6b3a028512 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -232,7 +232,7 @@ FileStorHandlerImpl::getQueueSize() const { size_t sum(0); for (const auto & stripe : _stripes) { - sum += stripe.getQueueSize(); + sum += stripe.get_cached_queue_size(); } return sum; } @@ -755,7 +755,10 @@ FileStorHandlerImpl::remapQueueNoLock(const RemapInfo& source, std::vector<Remap stripe(bucket).exposeQueue().emplace_back(std::move(entry)); } } - + stripe(source.bucket).unsafe_update_cached_queue_size(); + for (const auto *target: targets) { + stripe(target->bucket).unsafe_update_cached_queue_size(); + } } void @@ -840,6 +843,7 @@ FileStorHandlerImpl::Stripe::failOperations(const document::Bucket &bucket, cons ++iter; } } + update_cached_queue_size(guard); } void @@ -909,6 +913,7 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe _lock(std::make_unique<std::mutex>()), _cond(std::make_unique<std::condition_variable>()), _queue(std::make_unique<PriorityQueue>()), + _cached_queue_size(_queue->size()), _lockedBuckets(), _active_merges(0), _active_operations_stats() @@ -1029,6 +1034,7 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); document::Bucket bucket(iter->_bucket); idx.erase(iter); // iter not used after this point. + update_cached_queue_size(guard); if (!messageTimedOutInQueue(*msg, waitTime)) { auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(), @@ -1088,6 +1094,7 @@ FileStorHandlerImpl::Stripe::abort(std::vector<std::shared_ptr<api::StorageReply ++it; } } + update_cached_queue_size(lockGuard); } bool @@ -1096,6 +1103,7 @@ FileStorHandlerImpl::Stripe::schedule(MessageEntry messageEntry) { std::lock_guard guard(*_lock); _queue->emplace_back(std::move(messageEntry)); + update_cached_queue_size(guard); } _cond->notify_all(); return true; @@ -1106,6 +1114,7 @@ FileStorHandlerImpl::Stripe::schedule_and_get_next_async_message(MessageEntry en { std::unique_lock guard(*_lock); _queue->emplace_back(std::move(entry)); + update_cached_queue_size(guard); auto lockedMessage = get_next_async_message(guard); if ( ! lockedMessage.msg) { if (guard.owns_lock()) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index dbef1d06dad..c4b229418a1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -30,6 +30,7 @@ #include <boost/multi_index/sequenced_index.hpp> #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/stllike/hash_map.h> +#include <vespa/vespalib/datastore/atomic_value_wrapper.h> #include <atomic> #include <optional> @@ -74,6 +75,7 @@ public: using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type; using Clock = std::chrono::steady_clock; using monitor_guard = std::unique_lock<std::mutex>; + using atomic_size_t = vespalib::datastore::AtomicValueWrapper<size_t>; class Stripe { public: @@ -111,10 +113,11 @@ public: void broadcast() { _cond->notify_all(); } - size_t getQueueSize() const { - std::lock_guard guard(*_lock); - return _queue->size(); + size_t get_cached_queue_size() const { return _cached_queue_size.load_relaxed(); } + void unsafe_update_cached_queue_size() { + _cached_queue_size.store_relaxed(_queue->size()); } + void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock, api::StorageMessage::Id lockMsgId, bool was_active_merge); void decrease_active_sync_merges_counter() noexcept; @@ -142,6 +145,12 @@ public: void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; } ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const; private: + void update_cached_queue_size(const std::lock_guard<std::mutex> &) { + _cached_queue_size.store_relaxed(_queue->size()); + } + void update_cached_queue_size(const std::unique_lock<std::mutex> &) { + _cached_queue_size.store_relaxed(_queue->size()); + } bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const; FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard); [[nodiscard]] bool operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept; @@ -158,6 +167,7 @@ public: std::unique_ptr<std::mutex> _lock; std::unique_ptr<std::condition_variable> _cond; std::unique_ptr<PriorityQueue> _queue; + atomic_size_t _cached_queue_size; LockedBuckets _lockedBuckets; uint32_t _active_merges; mutable ActiveOperationsStats _active_operations_stats; |