diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-05-12 11:02:48 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-05-12 13:18:11 +0000 |
commit | 742d5d49f147ff652d78f5b8d270c81008d96283 (patch) | |
tree | 68eac9c736e1b20350de6b76a5a1392a84da56d9 /persistence | |
parent | 32e26201ca86681150eb47661ae551a1c188c594 (diff) |
Don't attempt to actually execute document moves from a cancelled bucket mover
This prevents the following race condition where the bucket mover logic
fails to notify the content layer that the bucket sub DB status has changed
for a particular bucket:
1. Bucket state is changed over SPI, a mover is created and registered and
a BucketTask is scheduled onto the persistence queues to actually do the
document reads and finalize the move.
2. Before the bucket task is executed, bucket state is changed again over
the SPI. A new mover is created, the old one is cancelled (tagging
mover as not consistent) and another BucketTask is scheduled onto
the persistence queues. Note: the old task still remains.
3. Old bucket task is executed and performs the actual document moving
despite being cancelled. No notification is done towards the content
layer since the mover was tagged as not being consistent.
4. New bucket task is executed and tries to move the same document set
as the old mover. Since the documents are no longer present in the
source document DB, the moves fail. This tags the mover as inconsistent
and no notification is done. Bucket is automatically rechecked, but
since all docs are already moved away there is nothing more to do
and no subsequent mover is created. This means the "should notify?"
edge is not triggered and the content layer remains blissfully unaware
of any sub DB changes.
This commit simply changes cancellation to actually inhibit document moves
from taking place. This lets the preempting mover successfully complete its
moves, thus triggering the notify-edge as expected.
Diffstat (limited to 'persistence')
-rw-r--r-- | persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp | 51 | ||||
-rw-r--r-- | persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h | 15 |
2 files changed, 63 insertions, 3 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp index 060215c4521..953cfcf733f 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp @@ -14,7 +14,9 @@ DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors) : _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)), _lock(), _cond(), - _inFlight() + _inFlight(), + _defer_tasks(false), + _deferred_tasks() { } @@ -24,6 +26,15 @@ DummyBucketExecutor::~DummyBucketExecutor() { void DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) { + if (!_defer_tasks) { + internal_execute_no_defer(bucket, std::move(task)); + } else { + _deferred_tasks.emplace_back(bucket, std::move(task)); + } +} + +void +DummyBucketExecutor::internal_execute_no_defer(const Bucket& bucket, std::unique_ptr<BucketTask> task) { auto failed = _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() { { std::unique_lock guard(_lock); @@ -45,6 +56,44 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> } void +DummyBucketExecutor::defer_new_tasks() { + std::lock_guard guard(_lock); + _defer_tasks = true; +} + +void +DummyBucketExecutor::schedule_all_deferred_tasks() { + DeferredTasks to_run; + { + std::lock_guard guard(_lock); + assert(_defer_tasks); + _deferred_tasks.swap(to_run); + } + for (auto& bucket_and_task : to_run) { + internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second)); + } +} + +size_t +DummyBucketExecutor::num_deferred_tasks() const noexcept { + std::lock_guard guard(_lock); + return _deferred_tasks.size(); +} + +void +DummyBucketExecutor::schedule_single_deferred_task() { + std::pair<Bucket, std::unique_ptr<BucketTask>> bucket_and_task; + { + std::lock_guard guard(_lock); + assert(_defer_tasks); + assert(!_deferred_tasks.empty()); + bucket_and_task = std::move(_deferred_tasks.front()); + _deferred_tasks.pop_front(); + } + internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second)); +} + +void DummyBucketExecutor::sync() { _executor->sync(); } diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h index 86b497437fe..3e1432f7f54 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h @@ -4,6 +4,7 @@ #include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/util/threadexecutor.h> +#include <deque> #include <mutex> #include <condition_variable> #include <unordered_set> @@ -19,11 +20,21 @@ public: ~DummyBucketExecutor() override; void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override; void sync(); + void defer_new_tasks(); + [[nodiscard]] size_t num_deferred_tasks() const noexcept; + void schedule_single_deferred_task(); + void schedule_all_deferred_tasks(); private: + void internal_execute_no_defer(const Bucket & bucket, std::unique_ptr<BucketTask> task); + + using DeferredTasks = std::deque<std::pair<Bucket, std::unique_ptr<BucketTask>>>; + std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; - std::mutex _lock; + mutable std::mutex _lock; std::condition_variable _cond; - std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight; + std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight; + bool _defer_tasks; + DeferredTasks _deferred_tasks; }; } |