diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-05-12 11:02:48 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-05-12 13:18:11 +0000 |
commit | 742d5d49f147ff652d78f5b8d270c81008d96283 (patch) | |
tree | 68eac9c736e1b20350de6b76a5a1392a84da56d9 | |
parent | 32e26201ca86681150eb47661ae551a1c188c594 (diff) |
Don't attempt to actually execute document moves from a cancelled bucket mover
This prevents the following race condition where the bucket mover logic
fails to notify the content layer that the bucket sub DB status has changed
for a particular bucket:
1. Bucket state is changed over SPI, a mover is created and registered and
a BucketTask is scheduled onto the persistence queues to actually do the
document reads and finalize the move.
2. Before the bucket task is executed, bucket state is changed again over
the SPI. A new mover is created, the old one is cancelled (tagging
mover as not consistent) and another BucketTask is scheduled onto
the persistence queues. Note: the old task still remains.
3. Old bucket task is executed and performs the actual document moving
despite being cancelled. No notification is done towards the content
layer since the mover was tagged as not being consistent.
4. New bucket task is executed and tries to move the same document set
as the old mover. Since the documents are no longer present in the
source document DB, the moves fail. This tags the mover as inconsistent
and no notification is done. Bucket is automatically rechecked, but
since all docs are already moved away there is nothing more to do
and no subsequent mover is created. This means the "should notify?"
edge is not triggered and the content layer remains blissfully unaware
of any sub DB changes.
This commit simply changes cancellation to actually inhibit document moves
from taking place. This lets the preempting mover successfully complete its
moves, thus triggering the notify-edge as expected.
6 files changed, 106 insertions, 6 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp index 060215c4521..953cfcf733f 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.cpp @@ -14,7 +14,9 @@ DummyBucketExecutor::DummyBucketExecutor(size_t numExecutors) : _executor(std::make_unique<vespalib::ThreadStackExecutor>(numExecutors, 0x10000)), _lock(), _cond(), - _inFlight() + _inFlight(), + _defer_tasks(false), + _deferred_tasks() { } @@ -24,6 +26,15 @@ DummyBucketExecutor::~DummyBucketExecutor() { void DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) { + if (!_defer_tasks) { + internal_execute_no_defer(bucket, std::move(task)); + } else { + _deferred_tasks.emplace_back(bucket, std::move(task)); + } +} + +void +DummyBucketExecutor::internal_execute_no_defer(const Bucket& bucket, std::unique_ptr<BucketTask> task) { auto failed = _executor->execute(makeLambdaTask([this, bucket, bucketTask=std::move(task)]() { { std::unique_lock guard(_lock); @@ -45,6 +56,44 @@ DummyBucketExecutor::execute(const Bucket & bucket, std::unique_ptr<BucketTask> } void +DummyBucketExecutor::defer_new_tasks() { + std::lock_guard guard(_lock); + _defer_tasks = true; +} + +void +DummyBucketExecutor::schedule_all_deferred_tasks() { + DeferredTasks to_run; + { + std::lock_guard guard(_lock); + assert(_defer_tasks); + _deferred_tasks.swap(to_run); + } + for (auto& bucket_and_task : to_run) { + internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second)); + } +} + +size_t +DummyBucketExecutor::num_deferred_tasks() const noexcept { + std::lock_guard guard(_lock); + return _deferred_tasks.size(); +} + +void +DummyBucketExecutor::schedule_single_deferred_task() { + std::pair<Bucket, std::unique_ptr<BucketTask>> bucket_and_task; + { + std::lock_guard guard(_lock); + assert(_defer_tasks); + assert(!_deferred_tasks.empty()); + bucket_and_task = std::move(_deferred_tasks.front()); + _deferred_tasks.pop_front(); + } + internal_execute_no_defer(bucket_and_task.first, std::move(bucket_and_task.second)); +} + +void DummyBucketExecutor::sync() { _executor->sync(); } diff --git a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h index 86b497437fe..3e1432f7f54 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummy_bucket_executor.h @@ -4,6 +4,7 @@ #include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/util/threadexecutor.h> +#include <deque> #include <mutex> #include <condition_variable> #include <unordered_set> @@ -19,11 +20,21 @@ public: ~DummyBucketExecutor() override; void execute(const Bucket & bucket, std::unique_ptr<BucketTask> task) override; void sync(); + void defer_new_tasks(); + [[nodiscard]] size_t num_deferred_tasks() const noexcept; + void schedule_single_deferred_task(); + void schedule_all_deferred_tasks(); private: + void internal_execute_no_defer(const Bucket & bucket, std::unique_ptr<BucketTask> task); + + using DeferredTasks = std::deque<std::pair<Bucket, std::unique_ptr<BucketTask>>>; + std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; - std::mutex _lock; + mutable std::mutex _lock; std::condition_variable _cond; - std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight; + std::unordered_set<document::Bucket, document::Bucket::hash> _inFlight; + bool _defer_tasks; + DeferredTasks _deferred_tasks; }; } diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 9bc374b8386..2db34e45140 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -294,7 +294,7 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) EXPECT_EQ(0, numPending()); EXPECT_EQ(7u, docsMoved().size()); EXPECT_EQ(3u, bucketsModified().size()); - EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); + ASSERT_EQ(_ready.bucket(2), bucketsModified()[0]); EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]); EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]); } @@ -481,6 +481,36 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); } +TEST_F(ControllerFixture, bucket_change_notification_is_not_lost_with_concurrent_bucket_movers) +{ + addReady(_ready.bucket(1)); + _bmj->recompute(); // Bucket 1 should be (and is) ready, bucket 2 is ready (but should not be). + _bucketExecutor.defer_new_tasks(); // Don't execute immediately, we need to force multiple pending moves + masterExecute([this]() { + deactivateBucket(_ready.bucket(2)); + _bmj->scanAndMove(4, 3); + // New deactivation received from above prior to completion of scan. This can happen since + // moves are asynchronous and the distributor can send new (de-)activations before the old move is done. + // In our case, we've enforced that another move is already pending in the bucket executor. + deactivateBucket(_ready.bucket(2)); + _bmj->scanAndMove(4, 3); + }); + sync(); + ASSERT_EQ(_bucketExecutor.num_deferred_tasks(), 2u); + _bucketExecutor.schedule_single_deferred_task(); + sync(); + // We have to fake that moving a document marks it as not found in the source sub DB. + // This doesn't automatically happen when using mocks. The most important part is that + // we ensure that moving isn't erroneously tested as if it were idempotent. + for (const auto& move : docsMoved()) { + failRetrieveForLid(move.getPrevLid()); + } + _bucketExecutor.schedule_single_deferred_task(); + sync(); + EXPECT_TRUE(_bmj->done()); + ASSERT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); +} TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job) { diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index e010240f5f8..45cac1d02e9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -159,7 +159,7 @@ BucketMoveJob::needMove(const ScanIterator &itr) const { return noMove; } const bool wantReady = (shouldBeReady == Trinary::True) || isActive; - LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)", + LOG(spam, "needMove(): bucket(%s), shouldBeReady(%s), active(%s)", itr.getBucket().toString().c_str(), toStr(shouldBeReady), toStr(isActive)); if (wantReady) { if (!hasNotReadyDocs) { @@ -241,6 +241,11 @@ BucketMoveJob::prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::Move void BucketMoveJob::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) { BucketMover & mover = ops.mover(); + if (mover.cancelled()) { + LOG(spam, "completeMove(%s, mover@%p): mover already cancelled, not processing it further", + mover.getBucket().toString().c_str(), &mover); + return; + } mover.moveDocuments(std::move(ops.success()), std::move(onDone)); ops.failed().clear(); if (checkIfMoverComplete(mover)) { @@ -280,6 +285,7 @@ void BucketMoveJob::cancelBucket(BucketId bucket) { auto inFlight = _bucketsInFlight.find(bucket); if (inFlight != _bucketsInFlight.end()) { + LOG(spam, "cancelBucket(%s): cancelling existing mover %p", bucket.toString().c_str(), inFlight->second.get()); inFlight->second->cancel(); checkIfMoverComplete(*inFlight->second); } @@ -329,7 +335,7 @@ std::shared_ptr<BucketMover> BucketMoveJob::createMover(BucketId bucket, bool wantReady) { const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready); const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); - LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", + LOG(debug, "createMover(): BucketMover::create(%s, source:%u, target:%u)", bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id()); return BucketMover::create(bucket, &source, target.sub_db_id(), _moveHandler); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index e7977c7380b..7a1826fe8ff 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -79,6 +79,7 @@ BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB _started(0), _completed(0), _needReschedule(false), + _cancelled(false), _allScheduled(false), _lastGidValid(false), _lastGid() @@ -138,6 +139,7 @@ BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallba void BucketMover::cancel() { + _cancelled = true; setAllScheduled(); _needReschedule.store(true, std::memory_order_relaxed); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index b8f1f7c732c..f908d019ca7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -123,6 +123,7 @@ public: const document::BucketId &getBucket() const { return _bucket; } void cancel(); + [[nodiscard]] bool cancelled() const noexcept { return _cancelled; } void setAllScheduled() { _allScheduled = true; } /// Signals all documents have been scheduled for move bool allScheduled() const { return _allScheduled; } @@ -147,6 +148,7 @@ private: std::atomic<uint32_t> _started; std::atomic<uint32_t> _completed; std::atomic<bool> _needReschedule; + bool _cancelled; bool _allScheduled; // All moves started, or operation has been cancelled bool _lastGidValid; document::GlobalId _lastGid; |