summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-03-04 14:16:37 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-03-04 14:32:05 +0000
commit0c007702e684c0e596f9538d4997604a07c745af (patch)
tree055c5a77351ac20cb30b7ba9d7f64b43d037e872 /storage
parentd126d46780082201da2daa9beb607169c7bacfcd (diff)
Limit merges per stripe, not globally
With a sufficient, even thread count this will ensure that no stripes end up completely blocked on processing merges, which can starve client operations. Having a global limit means that it was possible for stripes to completely fill up with merges. As an added bonus, moving the limit tracking to individual stripes means that we no longer have to track this as an atomic, since all access already happens under the Stripe lock.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp34
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h34
2 files changed, 32 insertions, 36 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 080446c1c92..355d0eb8cc6 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -24,15 +24,14 @@ namespace storage {
namespace {
-uint32_t merge_soft_limit_from_thread_count(uint32_t num_threads) noexcept {
+uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noexcept {
// Rationale: to avoid starving client ops we want to ensure that not all persistence
- // threads can be blocked by processing merges all at the same time. We therefore allocate
- // half of the threads to non-merge operations.
- // This a _soft_ limit since the current operation locking design means there is a small
- // window of time between when the limit is checked and when its updated. There are no
- // correctness violations as a consequence of this, but non-merge liveness may be impacted.
- // There must always be at least 1 thread that can process merges, or the system would stall.
- return std::max(1u, num_threads / 2);
+ // threads in any given stripe can be blocked by processing merges all at the same time.
+ // We therefore allocate half of the per-stripe threads to non-merge operations.
+ // Note that if the _total_ number of threads is small and odd (e.g. 3 or 5), it's still
+ // possible to have a stripe where all threads are busy processing merges because there
+ // is only 1 thread in the stripe in total.
+ return std::max(1u, (num_threads / num_stripes) / 2);
}
}
@@ -46,8 +45,7 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
_getNextMessageTimeout(100),
- _activeMergesSoftLimit(merge_soft_limit_from_thread_count(numThreads)),
- _activeMerges(0),
+ _max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)),
_paused(false)
{
_diskInfo.reserve(_component.getDiskCount());
@@ -56,7 +54,7 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
}
for (uint32_t i=0; i<_diskInfo.size(); ++i) {
_diskInfo[i].metrics = metrics.disks[i].get();
- assert(_diskInfo[i].metrics != 0);
+ assert(_diskInfo[i].metrics != nullptr);
uint32_t j(0);
for (Stripe & stripe : _diskInfo[i].getStripes()) {
stripe.setMetrics(metrics.disks[i]->stripes[j++].get());
@@ -930,8 +928,10 @@ FileStorHandlerImpl::Disk::schedule(const std::shared_ptr<api::StorageMessage>&
FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender)
: _owner(owner),
- _messageSender(messageSender)
-{ }
+ _messageSender(messageSender),
+ _active_merges(0)
+{}
+
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk)
{
@@ -1124,8 +1124,8 @@ void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
assert(entry._exclusiveLock);
assert(entry._exclusiveLock->msgId == lockMsgId);
if (entry._exclusiveLock->msgType == api::MessageType::MERGEBUCKET_ID) {
- auto before = _owner._activeMerges.fetch_sub(1, std::memory_order_relaxed);
- assert(before > 0);
+ assert(_active_merges > 0);
+ --_active_merges;
}
entry._exclusiveLock.reset();
} else {
@@ -1148,7 +1148,7 @@ void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const doc
if (lockReq == api::LockingRequirements::Exclusive) {
assert(entry._sharedLocks.empty());
if (lockEntry.msgType == api::MessageType::MERGEBUCKET_ID) {
- _owner._activeMerges.fetch_add(1, std::memory_order_relaxed);
+ ++_active_merges;
}
entry._exclusiveLock = lockEntry;
} else {
@@ -1184,7 +1184,7 @@ FileStorHandlerImpl::Stripe::operationIsInhibited(const vespalib::MonitorGuard&
const api::StorageMessage& msg) const noexcept
{
if ((msg.getType() == api::MessageType::MERGEBUCKET)
- && (_owner._activeMerges.load(std::memory_order_relaxed) > _owner._activeMergesSoftLimit))
+ && (_active_merges >= _owner._max_active_merges_per_stripe))
{
return true;
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 44756ed5891..253cb84caeb 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -144,12 +144,13 @@ public:
FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx,
PriorityIdx::iterator iter);
using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
- const FileStorHandlerImpl &_owner;
- MessageSender &_messageSender;
- FileStorStripeMetrics *_metrics;
- vespalib::Monitor _lock;
- PriorityQueue _queue;
- LockedBuckets _lockedBuckets;
+ const FileStorHandlerImpl &_owner;
+ MessageSender &_messageSender;
+ FileStorStripeMetrics *_metrics;
+ vespalib::Monitor _lock;
+ PriorityQueue _queue;
+ LockedBuckets _lockedBuckets;
+ uint32_t _active_merges;
};
struct Disk {
FileStorDiskMetrics * metrics;
@@ -222,7 +223,7 @@ public:
BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket,
uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id,
api::LockingRequirements lockReq);
- ~BucketLock();
+ ~BucketLock() override;
const document::Bucket &getBucket() const override { return _bucket; }
api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; }
@@ -287,20 +288,15 @@ public:
private:
ServiceLayerComponent _component;
- std::vector<Disk> _diskInfo;
- MessageSender& _messageSender;
+ std::vector<Disk> _diskInfo;
+ MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
-
- vespalib::Lock _mergeStatesLock;
-
+ vespalib::Lock _mergeStatesLock;
std::map<document::Bucket, MergeStatus::SP> _mergeStates;
-
- uint32_t _getNextMessageTimeout;
-
- uint32_t _activeMergesSoftLimit;
- mutable std::atomic<uint32_t> _activeMerges;
- vespalib::Monitor _pauseMonitor;
- std::atomic<bool> _paused;
+ uint32_t _getNextMessageTimeout;
+ const uint32_t _max_active_merges_per_stripe; // Read concurrently by stripes.
+ vespalib::Monitor _pauseMonitor;
+ std::atomic<bool> _paused;
void reply(api::StorageMessage&, DiskState state) const;