From f9ebc8582e68423803ff752d5b9d11e037e02d5d Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 27 Mar 2019 14:07:32 +0000 Subject: Limit number of persistence threads that can process merges in parallel Avoids starving other operations when there is a lot of merge activity taking place. For now, 1/2 of the total persistence thread pool may process merges. --- .../persistence/filestorage/filestorhandler.cpp | 6 ++-- .../persistence/filestorage/filestorhandler.h | 2 +- .../filestorage/filestorhandlerimpl.cpp | 41 ++++++++++++++++++++-- .../persistence/filestorage/filestorhandlerimpl.h | 7 +++- .../persistence/filestorage/filestormanager.cpp | 2 +- 5 files changed, 50 insertions(+), 8 deletions(-) (limited to 'storage') diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 0da0fd5ce66..350cdad791c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -6,14 +6,14 @@ namespace storage { FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics, const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg) - : _impl(new FileStorHandlerImpl(1, sender, metrics, partitions, compReg)) + : _impl(new FileStorHandlerImpl(1, 1, sender, metrics, partitions, compReg)) { } -FileStorHandler::FileStorHandler(uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, +FileStorHandler::FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg) - : _impl(new FileStorHandlerImpl(numStripes, sender, metrics, partitions, compReg)) + : _impl(new FileStorHandlerImpl(numThreads, numStripes, sender, metrics, partitions, compReg)) { } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 8f0b060d4d6..ab3d03a5e9a 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -71,7 +71,7 @@ public: CLOSED }; - FileStorHandler(uint32_t numStripes, MessageSender&, FileStorMetrics&, + FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&); FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 78498be2510..a56104ff717 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -22,7 +22,23 @@ using document::BucketSpace; namespace storage { -FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, +namespace { + +uint32_t merge_soft_limit_from_thread_count(uint32_t num_threads) noexcept { + // Rationale: to avoid starving client ops we want to ensure that not all persistence + // threads can be blocked by processing merges all at the same time. We therefore allocate + // half of the threads to non-merge operations. + // This a _soft_ limit since the current operation locking design means there is a small + // window of time between when the limit is checked and when its updated. There are no + // correctness violations as a consequence of this, but non-merge liveness may be impacted. + // There must always be at least 1 thread that can process merges, or the system would stall. + return std::max(1u, num_threads / 2); +} + +} + +FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, + FileStorMetrics& metrics, [[maybe_unused]] const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg) : _component(compReg, "filestorhandlerimpl"), @@ -30,6 +46,8 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numStripes, MessageSender& sen _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), _getNextMessageTimeout(100), + _activeMergesSoftLimit(merge_soft_limit_from_thread_count(numThreads)), + _activeMerges(0), _paused(false) { _diskInfo.reserve(_component.getDiskCount()); @@ -926,7 +944,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk) PriorityIdx& idx(bmi::get<1>(_queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); - while (iter != end && isLocked(guard, iter->_bucket, iter->_command->lockingRequirements())) { + while (iter != end && operationIsInhibited(guard, iter->_bucket, *iter->_command)) { iter++; } if (iter != end) { @@ -1105,6 +1123,10 @@ void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket, if (reqOfReleasedLock == api::LockingRequirements::Exclusive) { assert(entry._exclusiveLock); assert(entry._exclusiveLock->msgId == lockMsgId); + if (entry._exclusiveLock->msgType == api::MessageType::MERGEBUCKET_ID) { + auto before = _owner._activeMerges.fetch_sub(1, std::memory_order_relaxed); + assert(before > 0); + } entry._exclusiveLock.reset(); } else { assert(!entry._exclusiveLock); @@ -1125,6 +1147,9 @@ void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const doc assert(!entry._exclusiveLock); if (lockReq == api::LockingRequirements::Exclusive) { assert(entry._sharedLocks.empty()); + if (lockEntry.msgType == api::MessageType::MERGEBUCKET_ID) { + _owner._activeMerges.fetch_add(1, std::memory_order_relaxed); + } entry._exclusiveLock = lockEntry; } else { // TODO use a hash set with a custom comparator/hasher instead...? @@ -1154,6 +1179,18 @@ FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const docu && !iter->second._sharedLocks.empty()); } +bool +FileStorHandlerImpl::Stripe::operationIsInhibited(const vespalib::MonitorGuard& guard, const document::Bucket& bucket, + const api::StorageMessage& msg) const noexcept +{ + if ((msg.getType() == api::MessageType::MERGEBUCKET) + && (_owner._activeMerges.load(std::memory_order_relaxed) > _owner._activeMergesSoftLimit)) + { + return true; + } + return isLocked(guard, bucket, msg.lockingRequirements()); +} + uint32_t FileStorHandlerImpl::Disk::getQueueSize() const noexcept { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 390b7284b85..d46aa635354 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -115,6 +115,9 @@ public: void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock, api::StorageMessage::Id lockMsgId); + // Subsumes isLocked + bool operationIsInhibited(const vespalib::MonitorGuard&, const document::Bucket&, + const api::StorageMessage&) const noexcept; bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&, api::LockingRequirements lockReq) const noexcept; @@ -230,7 +233,7 @@ public: api::LockingRequirements _lockReq; }; - FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&, + FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandlerImpl(); @@ -293,6 +296,8 @@ private: uint32_t _getNextMessageTimeout; + uint32_t _activeMergesSoftLimit; + mutable std::atomic _activeMerges; vespalib::Monitor _pauseMonitor; std::atomic _paused; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 76e04852178..baa6523cbb2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -108,7 +108,7 @@ FileStorManager::configure(std::unique_ptrinitDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads); - _filestorHandler.reset(new FileStorHandler(numStripes, *this, *_metrics, _partitions, _compReg)); + _filestorHandler.reset(new FileStorHandler(numThreads, numStripes, *this, *_metrics, _partitions, _compReg)); for (uint32_t i=0; i<_component.getDiskCount(); ++i) { if (_partitions[i].isUp()) { LOG(spam, "Setting up disk %u", i); -- cgit v1.2.3