diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-03 17:05:44 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-03 17:05:44 +0100 |
commit | d40ad68987516db67166e57f2e75bb5dbc91208f (patch) | |
tree | 8edc540e2610198efb22789ffcf986513ea12ce7 /storage | |
parent | ad4219f3c80db0f4448f7028757cd75b7c4028f0 (diff) | |
parent | 844ec9fcd684985cd775ecafaeaf719dd23051da (diff) |
Merge pull request #20359 from vespa-engine/vekterli/decrement-merge-counter-when-sync-merge-handling-complete
Decrement persistence thread merge counter when syncronous processing is complete [run-systemtest]
Diffstat (limited to 'storage')
7 files changed, 109 insertions, 20 deletions
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index fc986c3c6f2..4bbff9bb2ca 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -50,6 +50,8 @@ public: api::LockingRequirements lockingRequirements() const noexcept override { return api::LockingRequirements::Shared; } + void signal_operation_sync_phase_done() noexcept override {} + bool wants_sync_phase_done_notification() const noexcept override { return false; } static std::shared_ptr<NoBucketLock> make(document::Bucket bucket) { return std::make_shared<NoBucketLock>(bucket); } @@ -78,6 +80,8 @@ public: api::LockingRequirements lockingRequirements() const noexcept override { return api::LockingRequirements::Exclusive; } + void signal_operation_sync_phase_done() noexcept override {} + bool wants_sync_phase_done_notification() const noexcept override { return false; } static std::shared_ptr<MockBucketLock> make(document::Bucket bucket, MockBucketLocks& locks) { return std::make_shared<MockBucketLock>(bucket, locks); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 0d05fd21ce2..a980b5aa2e1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -49,14 +49,29 @@ public: {} }; - class BucketLockInterface { + // Interface that is used for "early ACKing" a potentially longer-running async + // operation when the persistence thread processing the operation has completed + // the synchronous aspects of the operation (such as dispatching one or more + // async operations over the SPI). + class OperationSyncPhaseDoneNotifier { public: - using SP = std::shared_ptr<BucketLockInterface>; + virtual ~OperationSyncPhaseDoneNotifier() = default; + + // Informs the caller if the operation wants to know when the persistence thread is + // done with the synchronous aspects of the operation. Returning false allows the caller + // to optimize for the case where this does _not_ need to happen. + [[nodiscard]] virtual bool wants_sync_phase_done_notification() const noexcept = 0; + // Invoked at most once at the point where the persistence thread is done handling the synchronous + // aspects of the operation iff wants_sync_phase_done_notification() was initially true. + virtual void signal_operation_sync_phase_done() noexcept = 0; + }; - virtual const document::Bucket &getBucket() const = 0; - virtual api::LockingRequirements lockingRequirements() const noexcept = 0; + class BucketLockInterface : public OperationSyncPhaseDoneNotifier { + public: + using SP = std::shared_ptr<BucketLockInterface>; - virtual ~BucketLockInterface() = default; + [[nodiscard]] virtual const document::Bucket &getBucket() const = 0; + [[nodiscard]] virtual api::LockingRequirements lockingRequirements() const noexcept = 0; }; using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index f76d2693309..152cda74d9b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -1054,7 +1054,8 @@ message_type_is_merge_related(api::MessageType::Id msg_type_id) { void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock, - api::StorageMessage::Id lockMsgId) + api::StorageMessage::Id lockMsgId, + bool was_active_merge) { std::unique_lock guard(*_lock); auto iter = _lockedBuckets.find(bucket); @@ -1065,7 +1066,7 @@ FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket, if (reqOfReleasedLock == api::LockingRequirements::Exclusive) { assert(entry._exclusiveLock); assert(entry._exclusiveLock->msgId == lockMsgId); - if (message_type_is_merge_related(entry._exclusiveLock->msgType)) { + if (was_active_merge) { assert(_active_merges > 0); --_active_merges; } @@ -1089,13 +1090,27 @@ FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket, } void +FileStorHandlerImpl::Stripe::decrease_active_sync_merges_counter() noexcept +{ + std::unique_lock guard(*_lock); + assert(_active_merges > 0); + const bool may_have_blocked_merge = (_active_merges == _owner._max_active_merges_per_stripe); + --_active_merges; + if (may_have_blocked_merge) { + guard.unlock(); + _cond->notify_all(); + } +} + +void FileStorHandlerImpl::Stripe::lock(const monitor_guard &, const document::Bucket & bucket, - api::LockingRequirements lockReq, const LockEntry & lockEntry) { + api::LockingRequirements lockReq, bool count_as_active_merge, + const LockEntry & lockEntry) { auto& entry = _lockedBuckets[bucket]; assert(!entry._exclusiveLock); if (lockReq == api::LockingRequirements::Exclusive) { assert(entry._sharedLocks.empty()); - if (message_type_is_merge_related(lockEntry.msgType)) { + if (count_as_active_merge) { ++_active_merges; } entry._exclusiveLock = lockEntry; @@ -1151,28 +1166,43 @@ FileStorHandlerImpl::Stripe::get_active_operations_stats(bool reset_min_max) con return result; } -FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard & guard, Stripe& stripe, - const document::Bucket &bucket, uint8_t priority, +FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard& guard, Stripe& stripe, + const document::Bucket& bucket, uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id msgId, api::LockingRequirements lockReq) : _stripe(stripe), _bucket(bucket), _uniqueMsgId(msgId), - _lockReq(lockReq) + _lockReq(lockReq), + _counts_towards_merge_limit(false) { if (_bucket.getBucketId().getRawId() != 0) { - _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId)); + _counts_towards_merge_limit = message_type_is_merge_related(msgType); + _stripe.lock(guard, _bucket, lockReq, _counts_towards_merge_limit, Stripe::LockEntry(priority, msgType, msgId)); LOG(spam, "Locked bucket %s for message %" PRIu64 " with priority %u in mode %s", - bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq)); + bucket.toString().c_str(), msgId, priority, api::to_string(lockReq)); } } FileStorHandlerImpl::BucketLock::~BucketLock() { if (_bucket.getBucketId().getRawId() != 0) { - _stripe.release(_bucket, _lockReq, _uniqueMsgId); + _stripe.release(_bucket, _lockReq, _uniqueMsgId, _counts_towards_merge_limit); LOG(spam, "Unlocked bucket %s for message %" PRIu64 " in mode %s", - _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq)); + _bucket.toString().c_str(), _uniqueMsgId, api::to_string(_lockReq)); + } +} + +void +FileStorHandlerImpl::BucketLock::signal_operation_sync_phase_done() noexcept +{ + // Not atomic, only destructor can read/write this other than this function, and since + // a strong ref must already be held to this object by the caller, we cannot race with it. + if (_counts_towards_merge_limit){ + LOG(spam, "Synchronous phase for bucket %s is done; reducing active count proactively", + _bucket.toString().c_str()); + _stripe.decrease_active_sync_merges_counter(); + _counts_towards_merge_limit = false; } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 3a89ff74f07..5d68be8a800 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -115,7 +115,8 @@ public: return _queue->size(); } void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock, - api::StorageMessage::Id lockMsgId); + api::StorageMessage::Id lockMsgId, bool was_active_merge); + void decrease_active_sync_merges_counter() noexcept; // Subsumes isLocked bool operationIsInhibited(const monitor_guard &, const document::Bucket&, @@ -124,7 +125,8 @@ public: api::LockingRequirements lockReq) const noexcept; void lock(const monitor_guard &, const document::Bucket & bucket, - api::LockingRequirements lockReq, const LockEntry & lockEntry); + api::LockingRequirements lockReq, bool count_as_active_merge, + const LockEntry & lockEntry); std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); @@ -168,12 +170,17 @@ public: const document::Bucket &getBucket() const override { return _bucket; } api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; } + void signal_operation_sync_phase_done() noexcept override; + bool wants_sync_phase_done_notification() const noexcept override { + return _counts_towards_merge_limit; + } private: Stripe & _stripe; - document::Bucket _bucket; + const document::Bucket _bucket; api::StorageMessage::Id _uniqueMsgId; api::LockingRequirements _lockReq; + bool _counts_towards_merge_limit; }; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 3c981e193f2..8b546771b71 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -29,9 +29,29 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen PersistenceHandler::~PersistenceHandler() = default; +// Guard that allows an operation that may be executed in an async fashion to +// be explicitly notified when the sync phase of the operation is done, i.e. +// when the persistence thread is no longer working on it. An operation that +// does not care about such notifications can safely return a nullptr notifier, +// in which case the guard is a no-op. +class OperationSyncPhaseTrackingGuard { + std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> _maybe_notifier; +public: + explicit OperationSyncPhaseTrackingGuard(const MessageTracker& tracker) + : _maybe_notifier(tracker.sync_phase_done_notifier_or_nullptr()) + {} + + ~OperationSyncPhaseTrackingGuard() { + if (_maybe_notifier) { + _maybe_notifier->signal_operation_sync_phase_done(); + } + } +}; + MessageTracker::UP PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) const { + OperationSyncPhaseTrackingGuard sync_guard(*tracker); switch (msg.getType().getId()) { case api::MessageType::GET_ID: return _simpleHandler.handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker)); diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 3f3d59c11aa..cbfc9463a8c 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -155,6 +155,15 @@ MessageTracker::generateReply(api::StorageCommand& cmd) } } +std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> +MessageTracker::sync_phase_done_notifier_or_nullptr() const +{ + if (_bucketLock->wants_sync_phase_done_notification()) { + return _bucketLock; + } + return {}; +} + PersistenceUtil::PersistenceUtil(const ServiceLayerComponent& component, FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, spi::PersistenceProvider& provider) : _component(component), diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 4cc2657ea56..4fd0e60c730 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -27,7 +27,7 @@ class PersistenceUtil; class MessageTracker : protected Types { public: - typedef std::unique_ptr<MessageTracker> UP; + using UP = std::unique_ptr<MessageTracker>; MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); @@ -81,6 +81,10 @@ public: bool checkForError(const spi::Result& response); + // Returns a non-nullptr notifier instance iff the underlying operation wants to be notified + // when the sync phase is complete. Otherwise returns a nullptr shared_ptr. + std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const; + static MessageTracker::UP createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); |