aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-12-03 15:15:54 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-12-03 15:25:59 +0000
commit844ec9fcd684985cd775ecafaeaf719dd23051da (patch)
treef39fbc541e1149282abd53e4c161acba90daaf23 /storage
parent546c454a5eecc440c4bb75c528697bbc59770faa (diff)
Decrement persistence thread merge counter when syncronous processing is complete
Add a generic interface for letting an operation know that the synchronous parts of its processing in the persistence thread is complete. This allows a potentially longer-running async operation to free up any limits that were put in place when it was taking up synchronous thread resources. Currently only used by merge-related operations (that may dispatch many async ops). Since we have a max upper bound for how many threads in a stripe may be processing merge ops at the same time (to avoid blocking client ops), we previously could effectively stall the pipelining of merges caused by hitting the concurrency limit even if all persistence threads were otherwise idle (waiting for prior async merge ops to complete). We now explicitly decrease the merge concurrency counter once the synchronous processing is done, allowing us to take on further merges immediately.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h25
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp52
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h13
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp20
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h6
7 files changed, 109 insertions, 20 deletions
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index fc986c3c6f2..4bbff9bb2ca 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -50,6 +50,8 @@ public:
api::LockingRequirements lockingRequirements() const noexcept override {
return api::LockingRequirements::Shared;
}
+ void signal_operation_sync_phase_done() noexcept override {}
+ bool wants_sync_phase_done_notification() const noexcept override { return false; }
static std::shared_ptr<NoBucketLock> make(document::Bucket bucket) {
return std::make_shared<NoBucketLock>(bucket);
}
@@ -78,6 +80,8 @@ public:
api::LockingRequirements lockingRequirements() const noexcept override {
return api::LockingRequirements::Exclusive;
}
+ void signal_operation_sync_phase_done() noexcept override {}
+ bool wants_sync_phase_done_notification() const noexcept override { return false; }
static std::shared_ptr<MockBucketLock> make(document::Bucket bucket, MockBucketLocks& locks) {
return std::make_shared<MockBucketLock>(bucket, locks);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 0d05fd21ce2..a980b5aa2e1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -49,14 +49,29 @@ public:
{}
};
- class BucketLockInterface {
+ // Interface that is used for "early ACKing" a potentially longer-running async
+ // operation when the persistence thread processing the operation has completed
+ // the synchronous aspects of the operation (such as dispatching one or more
+ // async operations over the SPI).
+ class OperationSyncPhaseDoneNotifier {
public:
- using SP = std::shared_ptr<BucketLockInterface>;
+ virtual ~OperationSyncPhaseDoneNotifier() = default;
+
+ // Informs the caller if the operation wants to know when the persistence thread is
+ // done with the synchronous aspects of the operation. Returning false allows the caller
+ // to optimize for the case where this does _not_ need to happen.
+ [[nodiscard]] virtual bool wants_sync_phase_done_notification() const noexcept = 0;
+ // Invoked at most once at the point where the persistence thread is done handling the synchronous
+ // aspects of the operation iff wants_sync_phase_done_notification() was initially true.
+ virtual void signal_operation_sync_phase_done() noexcept = 0;
+ };
- virtual const document::Bucket &getBucket() const = 0;
- virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
+ class BucketLockInterface : public OperationSyncPhaseDoneNotifier {
+ public:
+ using SP = std::shared_ptr<BucketLockInterface>;
- virtual ~BucketLockInterface() = default;
+ [[nodiscard]] virtual const document::Bucket &getBucket() const = 0;
+ [[nodiscard]] virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
};
using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index f76d2693309..152cda74d9b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -1054,7 +1054,8 @@ message_type_is_merge_related(api::MessageType::Id msg_type_id) {
void
FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
api::LockingRequirements reqOfReleasedLock,
- api::StorageMessage::Id lockMsgId)
+ api::StorageMessage::Id lockMsgId,
+ bool was_active_merge)
{
std::unique_lock guard(*_lock);
auto iter = _lockedBuckets.find(bucket);
@@ -1065,7 +1066,7 @@ FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
if (reqOfReleasedLock == api::LockingRequirements::Exclusive) {
assert(entry._exclusiveLock);
assert(entry._exclusiveLock->msgId == lockMsgId);
- if (message_type_is_merge_related(entry._exclusiveLock->msgType)) {
+ if (was_active_merge) {
assert(_active_merges > 0);
--_active_merges;
}
@@ -1089,13 +1090,27 @@ FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
}
void
+FileStorHandlerImpl::Stripe::decrease_active_sync_merges_counter() noexcept
+{
+ std::unique_lock guard(*_lock);
+ assert(_active_merges > 0);
+ const bool may_have_blocked_merge = (_active_merges == _owner._max_active_merges_per_stripe);
+ --_active_merges;
+ if (may_have_blocked_merge) {
+ guard.unlock();
+ _cond->notify_all();
+ }
+}
+
+void
FileStorHandlerImpl::Stripe::lock(const monitor_guard &, const document::Bucket & bucket,
- api::LockingRequirements lockReq, const LockEntry & lockEntry) {
+ api::LockingRequirements lockReq, bool count_as_active_merge,
+ const LockEntry & lockEntry) {
auto& entry = _lockedBuckets[bucket];
assert(!entry._exclusiveLock);
if (lockReq == api::LockingRequirements::Exclusive) {
assert(entry._sharedLocks.empty());
- if (message_type_is_merge_related(lockEntry.msgType)) {
+ if (count_as_active_merge) {
++_active_merges;
}
entry._exclusiveLock = lockEntry;
@@ -1151,28 +1166,43 @@ FileStorHandlerImpl::Stripe::get_active_operations_stats(bool reset_min_max) con
return result;
}
-FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard & guard, Stripe& stripe,
- const document::Bucket &bucket, uint8_t priority,
+FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard& guard, Stripe& stripe,
+ const document::Bucket& bucket, uint8_t priority,
api::MessageType::Id msgType, api::StorageMessage::Id msgId,
api::LockingRequirements lockReq)
: _stripe(stripe),
_bucket(bucket),
_uniqueMsgId(msgId),
- _lockReq(lockReq)
+ _lockReq(lockReq),
+ _counts_towards_merge_limit(false)
{
if (_bucket.getBucketId().getRawId() != 0) {
- _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId));
+ _counts_towards_merge_limit = message_type_is_merge_related(msgType);
+ _stripe.lock(guard, _bucket, lockReq, _counts_towards_merge_limit, Stripe::LockEntry(priority, msgType, msgId));
LOG(spam, "Locked bucket %s for message %" PRIu64 " with priority %u in mode %s",
- bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq));
+ bucket.toString().c_str(), msgId, priority, api::to_string(lockReq));
}
}
FileStorHandlerImpl::BucketLock::~BucketLock() {
if (_bucket.getBucketId().getRawId() != 0) {
- _stripe.release(_bucket, _lockReq, _uniqueMsgId);
+ _stripe.release(_bucket, _lockReq, _uniqueMsgId, _counts_towards_merge_limit);
LOG(spam, "Unlocked bucket %s for message %" PRIu64 " in mode %s",
- _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq));
+ _bucket.toString().c_str(), _uniqueMsgId, api::to_string(_lockReq));
+ }
+}
+
+void
+FileStorHandlerImpl::BucketLock::signal_operation_sync_phase_done() noexcept
+{
+ // Not atomic, only destructor can read/write this other than this function, and since
+ // a strong ref must already be held to this object by the caller, we cannot race with it.
+ if (_counts_towards_merge_limit){
+ LOG(spam, "Synchronous phase for bucket %s is done; reducing active count proactively",
+ _bucket.toString().c_str());
+ _stripe.decrease_active_sync_merges_counter();
+ _counts_towards_merge_limit = false;
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 3a89ff74f07..5d68be8a800 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -115,7 +115,8 @@ public:
return _queue->size();
}
void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock,
- api::StorageMessage::Id lockMsgId);
+ api::StorageMessage::Id lockMsgId, bool was_active_merge);
+ void decrease_active_sync_merges_counter() noexcept;
// Subsumes isLocked
bool operationIsInhibited(const monitor_guard &, const document::Bucket&,
@@ -124,7 +125,8 @@ public:
api::LockingRequirements lockReq) const noexcept;
void lock(const monitor_guard &, const document::Bucket & bucket,
- api::LockingRequirements lockReq, const LockEntry & lockEntry);
+ api::LockingRequirements lockReq, bool count_as_active_merge,
+ const LockEntry & lockEntry);
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
@@ -168,12 +170,17 @@ public:
const document::Bucket &getBucket() const override { return _bucket; }
api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; }
+ void signal_operation_sync_phase_done() noexcept override;
+ bool wants_sync_phase_done_notification() const noexcept override {
+ return _counts_towards_merge_limit;
+ }
private:
Stripe & _stripe;
- document::Bucket _bucket;
+ const document::Bucket _bucket;
api::StorageMessage::Id _uniqueMsgId;
api::LockingRequirements _lockReq;
+ bool _counts_towards_merge_limit;
};
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 3c981e193f2..8b546771b71 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -29,9 +29,29 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
PersistenceHandler::~PersistenceHandler() = default;
+// Guard that allows an operation that may be executed in an async fashion to
+// be explicitly notified when the sync phase of the operation is done, i.e.
+// when the persistence thread is no longer working on it. An operation that
+// does not care about such notifications can safely return a nullptr notifier,
+// in which case the guard is a no-op.
+class OperationSyncPhaseTrackingGuard {
+ std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> _maybe_notifier;
+public:
+ explicit OperationSyncPhaseTrackingGuard(const MessageTracker& tracker)
+ : _maybe_notifier(tracker.sync_phase_done_notifier_or_nullptr())
+ {}
+
+ ~OperationSyncPhaseTrackingGuard() {
+ if (_maybe_notifier) {
+ _maybe_notifier->signal_operation_sync_phase_done();
+ }
+ }
+};
+
MessageTracker::UP
PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) const
{
+ OperationSyncPhaseTrackingGuard sync_guard(*tracker);
switch (msg.getType().getId()) {
case api::MessageType::GET_ID:
return _simpleHandler.handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 3f3d59c11aa..cbfc9463a8c 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -155,6 +155,15 @@ MessageTracker::generateReply(api::StorageCommand& cmd)
}
}
+std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier>
+MessageTracker::sync_phase_done_notifier_or_nullptr() const
+{
+ if (_bucketLock->wants_sync_phase_done_notification()) {
+ return _bucketLock;
+ }
+ return {};
+}
+
PersistenceUtil::PersistenceUtil(const ServiceLayerComponent& component, FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics, spi::PersistenceProvider& provider)
: _component(component),
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 4cc2657ea56..4fd0e60c730 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -27,7 +27,7 @@ class PersistenceUtil;
class MessageTracker : protected Types {
public:
- typedef std::unique_ptr<MessageTracker> UP;
+ using UP = std::unique_ptr<MessageTracker>;
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
@@ -81,6 +81,10 @@ public:
bool checkForError(const spi::Result& response);
+ // Returns a non-nullptr notifier instance iff the underlying operation wants to be notified
+ // when the sync phase is complete. Otherwise returns a nullptr shared_ptr.
+ std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const;
+
static MessageTracker::UP
createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);