aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-12-03 17:05:44 +0100
committerGitHub <noreply@github.com>2021-12-03 17:05:44 +0100
commitd40ad68987516db67166e57f2e75bb5dbc91208f (patch)
tree8edc540e2610198efb22789ffcf986513ea12ce7 /storage
parentad4219f3c80db0f4448f7028757cd75b7c4028f0 (diff)
parent844ec9fcd684985cd775ecafaeaf719dd23051da (diff)
Merge pull request #20359 from vespa-engine/vekterli/decrement-merge-counter-when-sync-merge-handling-complete
Decrement persistence thread merge counter when syncronous processing is complete [run-systemtest]
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/persistencetestutils.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h25
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp52
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h13
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp20
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h6
7 files changed, 109 insertions, 20 deletions
diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h
index fc986c3c6f2..4bbff9bb2ca 100644
--- a/storage/src/tests/persistence/persistencetestutils.h
+++ b/storage/src/tests/persistence/persistencetestutils.h
@@ -50,6 +50,8 @@ public:
api::LockingRequirements lockingRequirements() const noexcept override {
return api::LockingRequirements::Shared;
}
+ void signal_operation_sync_phase_done() noexcept override {}
+ bool wants_sync_phase_done_notification() const noexcept override { return false; }
static std::shared_ptr<NoBucketLock> make(document::Bucket bucket) {
return std::make_shared<NoBucketLock>(bucket);
}
@@ -78,6 +80,8 @@ public:
api::LockingRequirements lockingRequirements() const noexcept override {
return api::LockingRequirements::Exclusive;
}
+ void signal_operation_sync_phase_done() noexcept override {}
+ bool wants_sync_phase_done_notification() const noexcept override { return false; }
static std::shared_ptr<MockBucketLock> make(document::Bucket bucket, MockBucketLocks& locks) {
return std::make_shared<MockBucketLock>(bucket, locks);
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 0d05fd21ce2..a980b5aa2e1 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -49,14 +49,29 @@ public:
{}
};
- class BucketLockInterface {
+ // Interface that is used for "early ACKing" a potentially longer-running async
+ // operation when the persistence thread processing the operation has completed
+ // the synchronous aspects of the operation (such as dispatching one or more
+ // async operations over the SPI).
+ class OperationSyncPhaseDoneNotifier {
public:
- using SP = std::shared_ptr<BucketLockInterface>;
+ virtual ~OperationSyncPhaseDoneNotifier() = default;
+
+ // Informs the caller if the operation wants to know when the persistence thread is
+ // done with the synchronous aspects of the operation. Returning false allows the caller
+ // to optimize for the case where this does _not_ need to happen.
+ [[nodiscard]] virtual bool wants_sync_phase_done_notification() const noexcept = 0;
+ // Invoked at most once at the point where the persistence thread is done handling the synchronous
+ // aspects of the operation iff wants_sync_phase_done_notification() was initially true.
+ virtual void signal_operation_sync_phase_done() noexcept = 0;
+ };
- virtual const document::Bucket &getBucket() const = 0;
- virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
+ class BucketLockInterface : public OperationSyncPhaseDoneNotifier {
+ public:
+ using SP = std::shared_ptr<BucketLockInterface>;
- virtual ~BucketLockInterface() = default;
+ [[nodiscard]] virtual const document::Bucket &getBucket() const = 0;
+ [[nodiscard]] virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
};
using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index f76d2693309..152cda74d9b 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -1054,7 +1054,8 @@ message_type_is_merge_related(api::MessageType::Id msg_type_id) {
void
FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
api::LockingRequirements reqOfReleasedLock,
- api::StorageMessage::Id lockMsgId)
+ api::StorageMessage::Id lockMsgId,
+ bool was_active_merge)
{
std::unique_lock guard(*_lock);
auto iter = _lockedBuckets.find(bucket);
@@ -1065,7 +1066,7 @@ FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
if (reqOfReleasedLock == api::LockingRequirements::Exclusive) {
assert(entry._exclusiveLock);
assert(entry._exclusiveLock->msgId == lockMsgId);
- if (message_type_is_merge_related(entry._exclusiveLock->msgType)) {
+ if (was_active_merge) {
assert(_active_merges > 0);
--_active_merges;
}
@@ -1089,13 +1090,27 @@ FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
}
void
+FileStorHandlerImpl::Stripe::decrease_active_sync_merges_counter() noexcept
+{
+ std::unique_lock guard(*_lock);
+ assert(_active_merges > 0);
+ const bool may_have_blocked_merge = (_active_merges == _owner._max_active_merges_per_stripe);
+ --_active_merges;
+ if (may_have_blocked_merge) {
+ guard.unlock();
+ _cond->notify_all();
+ }
+}
+
+void
FileStorHandlerImpl::Stripe::lock(const monitor_guard &, const document::Bucket & bucket,
- api::LockingRequirements lockReq, const LockEntry & lockEntry) {
+ api::LockingRequirements lockReq, bool count_as_active_merge,
+ const LockEntry & lockEntry) {
auto& entry = _lockedBuckets[bucket];
assert(!entry._exclusiveLock);
if (lockReq == api::LockingRequirements::Exclusive) {
assert(entry._sharedLocks.empty());
- if (message_type_is_merge_related(lockEntry.msgType)) {
+ if (count_as_active_merge) {
++_active_merges;
}
entry._exclusiveLock = lockEntry;
@@ -1151,28 +1166,43 @@ FileStorHandlerImpl::Stripe::get_active_operations_stats(bool reset_min_max) con
return result;
}
-FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard & guard, Stripe& stripe,
- const document::Bucket &bucket, uint8_t priority,
+FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard& guard, Stripe& stripe,
+ const document::Bucket& bucket, uint8_t priority,
api::MessageType::Id msgType, api::StorageMessage::Id msgId,
api::LockingRequirements lockReq)
: _stripe(stripe),
_bucket(bucket),
_uniqueMsgId(msgId),
- _lockReq(lockReq)
+ _lockReq(lockReq),
+ _counts_towards_merge_limit(false)
{
if (_bucket.getBucketId().getRawId() != 0) {
- _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId));
+ _counts_towards_merge_limit = message_type_is_merge_related(msgType);
+ _stripe.lock(guard, _bucket, lockReq, _counts_towards_merge_limit, Stripe::LockEntry(priority, msgType, msgId));
LOG(spam, "Locked bucket %s for message %" PRIu64 " with priority %u in mode %s",
- bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq));
+ bucket.toString().c_str(), msgId, priority, api::to_string(lockReq));
}
}
FileStorHandlerImpl::BucketLock::~BucketLock() {
if (_bucket.getBucketId().getRawId() != 0) {
- _stripe.release(_bucket, _lockReq, _uniqueMsgId);
+ _stripe.release(_bucket, _lockReq, _uniqueMsgId, _counts_towards_merge_limit);
LOG(spam, "Unlocked bucket %s for message %" PRIu64 " in mode %s",
- _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq));
+ _bucket.toString().c_str(), _uniqueMsgId, api::to_string(_lockReq));
+ }
+}
+
+void
+FileStorHandlerImpl::BucketLock::signal_operation_sync_phase_done() noexcept
+{
+ // Not atomic, only destructor can read/write this other than this function, and since
+ // a strong ref must already be held to this object by the caller, we cannot race with it.
+ if (_counts_towards_merge_limit){
+ LOG(spam, "Synchronous phase for bucket %s is done; reducing active count proactively",
+ _bucket.toString().c_str());
+ _stripe.decrease_active_sync_merges_counter();
+ _counts_towards_merge_limit = false;
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 3a89ff74f07..5d68be8a800 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -115,7 +115,8 @@ public:
return _queue->size();
}
void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock,
- api::StorageMessage::Id lockMsgId);
+ api::StorageMessage::Id lockMsgId, bool was_active_merge);
+ void decrease_active_sync_merges_counter() noexcept;
// Subsumes isLocked
bool operationIsInhibited(const monitor_guard &, const document::Bucket&,
@@ -124,7 +125,8 @@ public:
api::LockingRequirements lockReq) const noexcept;
void lock(const monitor_guard &, const document::Bucket & bucket,
- api::LockingRequirements lockReq, const LockEntry & lockEntry);
+ api::LockingRequirements lockReq, bool count_as_active_merge,
+ const LockEntry & lockEntry);
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
@@ -168,12 +170,17 @@ public:
const document::Bucket &getBucket() const override { return _bucket; }
api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; }
+ void signal_operation_sync_phase_done() noexcept override;
+ bool wants_sync_phase_done_notification() const noexcept override {
+ return _counts_towards_merge_limit;
+ }
private:
Stripe & _stripe;
- document::Bucket _bucket;
+ const document::Bucket _bucket;
api::StorageMessage::Id _uniqueMsgId;
api::LockingRequirements _lockReq;
+ bool _counts_towards_merge_limit;
};
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 3c981e193f2..8b546771b71 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -29,9 +29,29 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
PersistenceHandler::~PersistenceHandler() = default;
+// Guard that allows an operation that may be executed in an async fashion to
+// be explicitly notified when the sync phase of the operation is done, i.e.
+// when the persistence thread is no longer working on it. An operation that
+// does not care about such notifications can safely return a nullptr notifier,
+// in which case the guard is a no-op.
+class OperationSyncPhaseTrackingGuard {
+ std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> _maybe_notifier;
+public:
+ explicit OperationSyncPhaseTrackingGuard(const MessageTracker& tracker)
+ : _maybe_notifier(tracker.sync_phase_done_notifier_or_nullptr())
+ {}
+
+ ~OperationSyncPhaseTrackingGuard() {
+ if (_maybe_notifier) {
+ _maybe_notifier->signal_operation_sync_phase_done();
+ }
+ }
+};
+
MessageTracker::UP
PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTracker::UP tracker) const
{
+ OperationSyncPhaseTrackingGuard sync_guard(*tracker);
switch (msg.getType().getId()) {
case api::MessageType::GET_ID:
return _simpleHandler.handleGet(static_cast<api::GetCommand&>(msg), std::move(tracker));
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 3f3d59c11aa..cbfc9463a8c 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -155,6 +155,15 @@ MessageTracker::generateReply(api::StorageCommand& cmd)
}
}
+std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier>
+MessageTracker::sync_phase_done_notifier_or_nullptr() const
+{
+ if (_bucketLock->wants_sync_phase_done_notification()) {
+ return _bucketLock;
+ }
+ return {};
+}
+
PersistenceUtil::PersistenceUtil(const ServiceLayerComponent& component, FileStorHandler& fileStorHandler,
FileStorThreadMetrics& metrics, spi::PersistenceProvider& provider)
: _component(component),
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 4cc2657ea56..4fd0e60c730 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -27,7 +27,7 @@ class PersistenceUtil;
class MessageTracker : protected Types {
public:
- typedef std::unique_ptr<MessageTracker> UP;
+ using UP = std::unique_ptr<MessageTracker>;
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
@@ -81,6 +81,10 @@ public:
bool checkForError(const spi::Result& response);
+ // Returns a non-nullptr notifier instance iff the underlying operation wants to be notified
+ // when the sync phase is complete. Otherwise returns a nullptr shared_ptr.
+ std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const;
+
static MessageTracker::UP
createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);