summaryrefslogtreecommitdiffstats
path: root/persistence
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-05-12 11:02:48 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-05-12 13:18:11 +0000
commit742d5d49f147ff652d78f5b8d270c81008d96283 (patch)
tree68eac9c736e1b20350de6b76a5a1392a84da56d9 /persistence
parent32e26201ca86681150eb47661ae551a1c188c594 (diff)
Don't attempt to actually execute document moves from a cancelled bucket mover
This prevents the following race condition where the bucket mover logic fails to notify the content layer that the bucket sub DB status has changed for a particular bucket: 1. Bucket state is changed over SPI, a mover is created and registered and a BucketTask is scheduled onto the persistence queues to actually do the document reads and finalize the move. 2. Before the bucket task is executed, bucket state is changed again over the SPI. A new mover is created, the old one is cancelled (tagging mover as not consistent) and another BucketTask is scheduled onto the persistence queues. Note: the old task still remains. 3. Old bucket task is executed and performs the actual document moving despite being cancelled. No notification is done towards the content layer since the mover was tagged as not being consistent. 4. New bucket task is executed and tries to move the same document set as the old mover. Since the documents are no longer present in the source document DB, the moves fail. This tags the mover as inconsistent and no notification is done. Bucket is automatically rechecked, but since all docs are already moved away there is nothing more to do and no subsequent mover is created. This means the "should notify?" edge is not triggered and the content layer remains blissfully unaware of any sub DB changes. This commit simply changes cancellation to actually inhibit document moves from taking place. This lets the preempting mover successfully complete its moves, thus triggering the notify-edge as expected.
Diffstat (limited to 'persistence')
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp51
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h15
2 files changed, 63 insertions, 3 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
index 060215c4521..953cfcf733f 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp
@@ -14,7 +14,9 @@ DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors)
: _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)),
_lock(),
_cond(),
- _inFlight()
+ _inFlight(),
+ _defer_tasks(false),
+ _deferred_tasks()
{
}
@@ -24,6 +26,15 @@ DummyBucketExecutor::~DummyBucketExecutor() {
void
DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) {
+ if (!_defer_tasks) {
+ internal_execute_no_defer(bucket, std::move(task));
+ } else {
+ _deferred_tasks.emplace_back(bucket, std::move(task));
+ }
+}
+
+void
+DummyBucketExecutor::internal_execute_no_defer(const Bucket& bucket, std::unique_ptr<BucketTask> task) {
auto failed = _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() {
{
std::unique_lock guard(_lock);
@@ -45,6 +56,44 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask>
}
void
+DummyBucketExecutor::defer_new_tasks() {
+ std::lock_guard guard(_lock);
+ _defer_tasks = true;
+}
+
+void
+DummyBucketExecutor::schedule_all_deferred_tasks() {
+ DeferredTasks to_run;
+ {
+ std::lock_guard guard(_lock);
+ assert(_defer_tasks);
+ _deferred_tasks.swap(to_run);
+ }
+ for (auto& bucket_and_task : to_run) {
+ internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second));
+ }
+}
+
+size_t
+DummyBucketExecutor::num_deferred_tasks() const noexcept {
+ std::lock_guard guard(_lock);
+ return _deferred_tasks.size();
+}
+
+void
+DummyBucketExecutor::schedule_single_deferred_task() {
+ std::pair<Bucket, std::unique_ptr<BucketTask>> bucket_and_task;
+ {
+ std::lock_guard guard(_lock);
+ assert(_defer_tasks);
+ assert(!_deferred_tasks.empty());
+ bucket_and_task = std::move(_deferred_tasks.front());
+ _deferred_tasks.pop_front();
+ }
+ internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second));
+}
+
+void
DummyBucketExecutor::sync() {
_executor->sync();
}
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
index 86b497437fe..3e1432f7f54 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h
@@ -4,6 +4,7 @@
#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/util/threadexecutor.h>
+#include <deque>
#include <mutex>
#include <condition_variable>
#include <unordered_set>
@@ -19,11 +20,21 @@ public:
~DummyBucketExecutor() override;
void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override;
void sync();
+ void defer_new_tasks();
+ [[nodiscard]] size_t num_deferred_tasks() const noexcept;
+ void schedule_single_deferred_task();
+ void schedule_all_deferred_tasks();
private:
+ void internal_execute_no_defer(const Bucket & bucket, std::unique_ptr<BucketTask> task);
+
+ using DeferredTasks = std::deque<std::pair<Bucket, std::unique_ptr<BucketTask>>>;
+
std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
- std::mutex _lock;
+ mutable std::mutex _lock;
std::condition_variable _cond;
- std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+ std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight;
+ bool _defer_tasks;
+ DeferredTasks _deferred_tasks;
};
}