summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-05 17:51:15 +0200
committerGitHub <noreply@github.com>2019-04-05 17:51:15 +0200
commitcd5c621d1ee0bf2b735b2f5a18ab0cb1a8ff1285 (patch)
tree9fe05f4b0bc7e6519acd15d4c60003b178d6b41f
parentaa831f6fcbc0e8ab39a9cdaa4d0bb6f899535aea (diff)
parentf9ebc8582e68423803ff752d5b9d11e037e02d5d (diff)
Merge pull request #9036 from vespa-engine/vekterli/limit-persistence-threads-used-for-merge-ops
Limit number of persistence threads that can process merges in parallel
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp41
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h7
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp2
5 files changed, 50 insertions, 8 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index 0da0fd5ce66..350cdad791c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -6,14 +6,14 @@ namespace storage {
FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics,
const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg)
- : _impl(new FileStorHandlerImpl(1, sender, metrics, partitions, compReg))
+ : _impl(new FileStorHandlerImpl(1, 1, sender, metrics, partitions, compReg))
{
}
-FileStorHandler::FileStorHandler(uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics,
+FileStorHandler::FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics,
const spi::PartitionStateList& partitions, ServiceLayerComponentRegister& compReg)
- : _impl(new FileStorHandlerImpl(numStripes, sender, metrics, partitions, compReg))
+ : _impl(new FileStorHandlerImpl(numThreads, numStripes, sender, metrics, partitions, compReg))
{
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 8f0b060d4d6..ab3d03a5e9a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -71,7 +71,7 @@ public:
CLOSED
};
- FileStorHandler(uint32_t numStripes, MessageSender&, FileStorMetrics&,
+ FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
const spi::PartitionStateList&, ServiceLayerComponentRegister&);
FileStorHandler(MessageSender&, FileStorMetrics&,
const spi::PartitionStateList&, ServiceLayerComponentRegister&);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 78498be2510..a56104ff717 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -22,7 +22,23 @@ using document::BucketSpace;
namespace storage {
-FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics,
+namespace {
+
+uint32_t merge_soft_limit_from_thread_count(uint32_t num_threads) noexcept {
+ // Rationale: to avoid starving client ops we want to ensure that not all persistence
+ // threads can be blocked by processing merges all at the same time. We therefore allocate
+ // half of the threads to non-merge operations.
+ // This a _soft_ limit since the current operation locking design means there is a small
+ // window of time between when the limit is checked and when its updated. There are no
+ // correctness violations as a consequence of this, but non-merge liveness may be impacted.
+ // There must always be at least 1 thread that can process merges, or the system would stall.
+ return std::max(1u, num_threads / 2);
+}
+
+}
+
+FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
+ FileStorMetrics& metrics,
[[maybe_unused]] const spi::PartitionStateList& partitions,
ServiceLayerComponentRegister& compReg)
: _component(compReg, "filestorhandlerimpl"),
@@ -30,6 +46,8 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numStripes, MessageSender& sen
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
_getNextMessageTimeout(100),
+ _activeMergesSoftLimit(merge_soft_limit_from_thread_count(numThreads)),
+ _activeMerges(0),
_paused(false)
{
_diskInfo.reserve(_component.getDiskCount());
@@ -926,7 +944,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk)
PriorityIdx& idx(bmi::get<1>(_queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
- while (iter != end && isLocked(guard, iter->_bucket, iter->_command->lockingRequirements())) {
+ while (iter != end && operationIsInhibited(guard, iter->_bucket, *iter->_command)) {
iter++;
}
if (iter != end) {
@@ -1105,6 +1123,10 @@ void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
if (reqOfReleasedLock == api::LockingRequirements::Exclusive) {
assert(entry._exclusiveLock);
assert(entry._exclusiveLock->msgId == lockMsgId);
+ if (entry._exclusiveLock->msgType == api::MessageType::MERGEBUCKET_ID) {
+ auto before = _owner._activeMerges.fetch_sub(1, std::memory_order_relaxed);
+ assert(before > 0);
+ }
entry._exclusiveLock.reset();
} else {
assert(!entry._exclusiveLock);
@@ -1125,6 +1147,9 @@ void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const doc
assert(!entry._exclusiveLock);
if (lockReq == api::LockingRequirements::Exclusive) {
assert(entry._sharedLocks.empty());
+ if (lockEntry.msgType == api::MessageType::MERGEBUCKET_ID) {
+ _owner._activeMerges.fetch_add(1, std::memory_order_relaxed);
+ }
entry._exclusiveLock = lockEntry;
} else {
// TODO use a hash set with a custom comparator/hasher instead...?
@@ -1154,6 +1179,18 @@ FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const docu
&& !iter->second._sharedLocks.empty());
}
+bool
+FileStorHandlerImpl::Stripe::operationIsInhibited(const vespalib::MonitorGuard& guard, const document::Bucket& bucket,
+ const api::StorageMessage& msg) const noexcept
+{
+ if ((msg.getType() == api::MessageType::MERGEBUCKET)
+ && (_owner._activeMerges.load(std::memory_order_relaxed) > _owner._activeMergesSoftLimit))
+ {
+ return true;
+ }
+ return isLocked(guard, bucket, msg.lockingRequirements());
+}
+
uint32_t
FileStorHandlerImpl::Disk::getQueueSize() const noexcept
{
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 390b7284b85..d46aa635354 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -115,6 +115,9 @@ public:
void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock,
api::StorageMessage::Id lockMsgId);
+ // Subsumes isLocked
+ bool operationIsInhibited(const vespalib::MonitorGuard&, const document::Bucket&,
+ const api::StorageMessage&) const noexcept;
bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&,
api::LockingRequirements lockReq) const noexcept;
@@ -230,7 +233,7 @@ public:
api::LockingRequirements _lockReq;
};
- FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&,
+ FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
const spi::PartitionStateList&, ServiceLayerComponentRegister&);
~FileStorHandlerImpl();
@@ -293,6 +296,8 @@ private:
uint32_t _getNextMessageTimeout;
+ uint32_t _activeMergesSoftLimit;
+ mutable std::atomic<uint32_t> _activeMerges;
vespalib::Monitor _pauseMonitor;
std::atomic<bool> _paused;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 76e04852178..baa6523cbb2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -108,7 +108,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
size_t numStripes = std::min(2ul, numThreads);
_metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads);
- _filestorHandler.reset(new FileStorHandler(numStripes, *this, *_metrics, _partitions, _compReg));
+ _filestorHandler.reset(new FileStorHandler(numThreads, numStripes, *this, *_metrics, _partitions, _compReg));
for (uint32_t i=0; i<_component.getDiskCount(); ++i) {
if (_partitions[i].isUp()) {
LOG(spam, "Setting up disk %u", i);