diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-05-12 16:07:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-12 16:07:44 +0200 |
commit | 6f7d407ebcfb430136c6335dd731e79af56965d7 (patch) | |
tree | 15d31c684758c573a954e913fd4460fd79d02761 /searchcore | |
parent | 2f1beb1a191580fde26e1b855a1fb7426c789e30 (diff) | |
parent | 742d5d49f147ff652d78f5b8d270c81008d96283 (diff) |
Merge pull request #22571 from vespa-engine/vekterli/improve-handling-of-preempted-bucket-move
Don't attempt to execute document moves from a cancelled bucket mover [run-systemtest]
Diffstat (limited to 'searchcore')
4 files changed, 43 insertions, 3 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 9bc374b8386..2db34e45140 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -294,7 +294,7 @@ TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) EXPECT_EQ(0, numPending()); EXPECT_EQ(7u, docsMoved().size()); EXPECT_EQ(3u, bucketsModified().size()); - EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); + ASSERT_EQ(_ready.bucket(2), bucketsModified()[0]); EXPECT_EQ(_notReady.bucket(3), bucketsModified()[1]); EXPECT_EQ(_notReady.bucket(4), bucketsModified()[2]); } @@ -481,6 +481,36 @@ TEST_F(ControllerFixture, explicitly_active_not_ready_bucket_can_be_moved_to_rea EXPECT_EQ(_notReady.bucket(3), bucketsModified()[0]); } +TEST_F(ControllerFixture, bucket_change_notification_is_not_lost_with_concurrent_bucket_movers) +{ + addReady(_ready.bucket(1)); + _bmj->recompute(); // Bucket 1 should be (and is) ready, bucket 2 is ready (but should not be). + _bucketExecutor.defer_new_tasks(); // Don't execute immediately, we need to force multiple pending moves + masterExecute([this]() { + deactivateBucket(_ready.bucket(2)); + _bmj->scanAndMove(4, 3); + // New deactivation received from above prior to completion of scan. This can happen since + // moves are asynchronous and the distributor can send new (de-)activations before the old move is done. + // In our case, we've enforced that another move is already pending in the bucket executor. + deactivateBucket(_ready.bucket(2)); + _bmj->scanAndMove(4, 3); + }); + sync(); + ASSERT_EQ(_bucketExecutor.num_deferred_tasks(), 2u); + _bucketExecutor.schedule_single_deferred_task(); + sync(); + // We have to fake that moving a document marks it as not found in the source sub DB. + // This doesn't automatically happen when using mocks. The most important part is that + // we ensure that moving isn't erroneously tested as if it were idempotent. + for (const auto& move : docsMoved()) { + failRetrieveForLid(move.getPrevLid()); + } + _bucketExecutor.schedule_single_deferred_task(); + sync(); + EXPECT_TRUE(_bmj->done()); + ASSERT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); +} TEST_F(ControllerFixture, require_that_notifyCreateBucket_causes_bucket_to_be_reconsidered_by_job) { diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index e010240f5f8..45cac1d02e9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -159,7 +159,7 @@ BucketMoveJob::needMove(const ScanIterator &itr) const { return noMove; } const bool wantReady = (shouldBeReady == Trinary::True) || isActive; - LOG(spam, "checkBucket(): bucket(%s), shouldBeReady(%s), active(%s)", + LOG(spam, "needMove(): bucket(%s), shouldBeReady(%s), active(%s)", itr.getBucket().toString().c_str(), toStr(shouldBeReady), toStr(isActive)); if (wantReady) { if (!hasNotReadyDocs) { @@ -241,6 +241,11 @@ BucketMoveJob::prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::Move void BucketMoveJob::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) { BucketMover & mover = ops.mover(); + if (mover.cancelled()) { + LOG(spam, "completeMove(%s, mover@%p): mover already cancelled, not processing it further", + mover.getBucket().toString().c_str(), &mover); + return; + } mover.moveDocuments(std::move(ops.success()), std::move(onDone)); ops.failed().clear(); if (checkIfMoverComplete(mover)) { @@ -280,6 +285,7 @@ void BucketMoveJob::cancelBucket(BucketId bucket) { auto inFlight = _bucketsInFlight.find(bucket); if (inFlight != _bucketsInFlight.end()) { + LOG(spam, "cancelBucket(%s): cancelling existing mover %p", bucket.toString().c_str(), inFlight->second.get()); inFlight->second->cancel(); checkIfMoverComplete(*inFlight->second); } @@ -329,7 +335,7 @@ std::shared_ptr<BucketMover> BucketMoveJob::createMover(BucketId bucket, bool wantReady) { const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready); const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); - LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", + LOG(debug, "createMover(): BucketMover::create(%s, source:%u, target:%u)", bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id()); return BucketMover::create(bucket, &source, target.sub_db_id(), _moveHandler); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index e7977c7380b..7a1826fe8ff 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -79,6 +79,7 @@ BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB _started(0), _completed(0), _needReschedule(false), + _cancelled(false), _allScheduled(false), _lastGidValid(false), _lastGid() @@ -138,6 +139,7 @@ BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallba void BucketMover::cancel() { + _cancelled = true; setAllScheduled(); _needReschedule.store(true, std::memory_order_relaxed); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index b8f1f7c732c..f908d019ca7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -123,6 +123,7 @@ public: const document::BucketId &getBucket() const { return _bucket; } void cancel(); + [[nodiscard]] bool cancelled() const noexcept { return _cancelled; } void setAllScheduled() { _allScheduled = true; } /// Signals all documents have been scheduled for move bool allScheduled() const { return _allScheduled; } @@ -147,6 +148,7 @@ private: std::atomic<uint32_t> _started; std::atomic<uint32_t> _completed; std::atomic<bool> _needReschedule; + bool _cancelled; bool _allScheduled; // All moves started, or operation has been cancelled bool _lastGidValid; document::GlobalId _lastGid; |