summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com>2022-05-23 14:10:41 +0200
committerGitHub <noreply@github.com>2022-05-23 14:10:41 +0200
commitb5ed98198b96556e6be40604f8ab93b54c71e645 (patch)
treed150db0a1e6d3c11f140640471aefe760264c4b1
parent78f729443365ea958257bec76812714601730d6f (diff)
parentba55f9c8787901143806f88d33db8bc393f6ff2e (diff)
Merge pull request #22707 from vespa-engine/havardpe/used-cached-queue-size
use cached queue size for metrics to avoid lock inversion
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp13
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h16
2 files changed, 24 insertions, 5 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index cfcc32a76e5..a6b3a028512 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -232,7 +232,7 @@ FileStorHandlerImpl::getQueueSize() const
{
size_t sum(0);
for (const auto & stripe : _stripes) {
- sum += stripe.getQueueSize();
+ sum += stripe.get_cached_queue_size();
}
return sum;
}
@@ -755,7 +755,10 @@ FileStorHandlerImpl::remapQueueNoLock(const RemapInfo& source, std::vector<Remap
stripe(bucket).exposeQueue().emplace_back(std::move(entry));
}
}
-
+ stripe(source.bucket).unsafe_update_cached_queue_size();
+ for (const auto *target: targets) {
+ stripe(target->bucket).unsafe_update_cached_queue_size();
+ }
}
void
@@ -840,6 +843,7 @@ FileStorHandlerImpl::Stripe::failOperations(const document::Bucket &bucket, cons
++iter;
}
}
+ update_cached_queue_size(guard);
}
void
@@ -909,6 +913,7 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe
_lock(std::make_unique<std::mutex>()),
_cond(std::make_unique<std::condition_variable>()),
_queue(std::make_unique<PriorityQueue>()),
+ _cached_queue_size(_queue->size()),
_lockedBuckets(),
_active_merges(0),
_active_operations_stats()
@@ -1029,6 +1034,7 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx
std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
document::Bucket bucket(iter->_bucket);
idx.erase(iter); // iter not used after this point.
+ update_cached_queue_size(guard);
if (!messageTimedOutInQueue(*msg, waitTime)) {
auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(),
@@ -1088,6 +1094,7 @@ FileStorHandlerImpl::Stripe::abort(std::vector<std::shared_ptr<api::StorageReply
++it;
}
}
+ update_cached_queue_size(lockGuard);
}
bool
@@ -1096,6 +1103,7 @@ FileStorHandlerImpl::Stripe::schedule(MessageEntry messageEntry)
{
std::lock_guard guard(*_lock);
_queue->emplace_back(std::move(messageEntry));
+ update_cached_queue_size(guard);
}
_cond->notify_all();
return true;
@@ -1106,6 +1114,7 @@ FileStorHandlerImpl::Stripe::schedule_and_get_next_async_message(MessageEntry en
{
std::unique_lock guard(*_lock);
_queue->emplace_back(std::move(entry));
+ update_cached_queue_size(guard);
auto lockedMessage = get_next_async_message(guard);
if ( ! lockedMessage.msg) {
if (guard.owns_lock()) {
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index dbef1d06dad..c4b229418a1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -30,6 +30,7 @@
#include <boost/multi_index/sequenced_index.hpp>
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/stllike/hash_map.h>
+#include <vespa/vespalib/datastore/atomic_value_wrapper.h>
#include <atomic>
#include <optional>
@@ -74,6 +75,7 @@ public:
using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type;
using Clock = std::chrono::steady_clock;
using monitor_guard = std::unique_lock<std::mutex>;
+ using atomic_size_t = vespalib::datastore::AtomicValueWrapper<size_t>;
class Stripe {
public:
@@ -111,10 +113,11 @@ public:
void broadcast() {
_cond->notify_all();
}
- size_t getQueueSize() const {
- std::lock_guard guard(*_lock);
- return _queue->size();
+ size_t get_cached_queue_size() const { return _cached_queue_size.load_relaxed(); }
+ void unsafe_update_cached_queue_size() {
+ _cached_queue_size.store_relaxed(_queue->size());
}
+
void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock,
api::StorageMessage::Id lockMsgId, bool was_active_merge);
void decrease_active_sync_merges_counter() noexcept;
@@ -142,6 +145,12 @@ public:
void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; }
ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const;
private:
+ void update_cached_queue_size(const std::lock_guard<std::mutex> &) {
+ _cached_queue_size.store_relaxed(_queue->size());
+ }
+ void update_cached_queue_size(const std::unique_lock<std::mutex> &) {
+ _cached_queue_size.store_relaxed(_queue->size());
+ }
bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const;
FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard);
[[nodiscard]] bool operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept;
@@ -158,6 +167,7 @@ public:
std::unique_ptr<std::mutex> _lock;
std::unique_ptr<std::condition_variable> _cond;
std::unique_ptr<PriorityQueue> _queue;
+ atomic_size_t _cached_queue_size;
LockedBuckets _lockedBuckets;
uint32_t _active_merges;
mutable ActiveOperationsStats _active_operations_stats;