diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-03-05 13:34:05 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-03-05 13:34:05 +0000 |
commit | da7b6f74bc8149b02b777710ab24520196840e29 (patch) | |
tree | 256b411cf58151b65ee8dd9872a3c5dedb1bc577 /searchcore | |
parent | 1eeeb4e17bf1ea5ce2e3f2cb74ee9474dd4a396e (diff) |
If a document was removed or moved between th emove started and the document was retrieved
there would be a racewith state change or othe bucket changes requiring the bucket to be reconsidered.
The mover would appear in sync prior to completing the move in the master thread.
This should now be accounted by accounting the failed state in the mover.
Diffstat (limited to 'searchcore')
6 files changed, 93 insertions, 17 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h index e955f017d67..f70a4bfad11 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h @@ -64,8 +64,13 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest { using DocumentVector = std::vector<Document::SP>; std::shared_ptr<const DocumentTypeRepo> _repo; DocumentVector _docs; + uint32_t _lid2Fail; - MyDocumentRetriever(std::shared_ptr<const DocumentTypeRepo> repo) : _repo(std::move(repo)), _docs() { + MyDocumentRetriever(std::shared_ptr<const DocumentTypeRepo> repo) + : _repo(std::move(repo)), + _docs(), + _lid2Fail(0) + { _docs.push_back(Document::UP()); // lid 0 invalid } @@ -76,9 +81,11 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest { DocumentMetaData getDocumentMetaData(const DocumentId &) const override { return DocumentMetaData(); } Document::UP getFullDocument(DocumentIdT lid) const override { - return Document::UP(_docs[lid]->clone()); + return (lid != _lid2Fail) ? Document::UP(_docs[lid]->clone()) : Document::UP(); } + void failRetrieveForLid(uint32_t lid) { _lid2Fail = lid; } + CachedSelect::SP parseSelect(const vespalib::string &) const override { return {}; } @@ -115,6 +122,10 @@ struct MySubDb { void insertDocs(const UserDocuments &docs_); + void failRetrieveForLid(uint32_t lid) { + _realRetriever->failRetrieveForLid(lid); + } + BucketId bucket(uint32_t userId) const { return _docs.getBucket(userId); } diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp index 99692ec53bd..d3f4b2fbedd 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp @@ -70,6 +70,14 @@ struct ControllerFixtureBase : public ::testing::Test _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE); return *this; } + void failRetrieveForLid(uint32_t lid) { + _ready.failRetrieveForLid(lid); + _notReady.failRetrieveForLid(lid); + } + void fixRetriever() { + _ready.failRetrieveForLid(0); + _notReady.failRetrieveForLid(0); + } const MoveOperationVector &docsMoved() const { return _moveHandler._moves; } @@ -200,6 +208,28 @@ TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_buc EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); } +TEST_F(ControllerFixture, require_that_bucket_is_moved_even_with_error) +{ + // bucket 2 should be moved + addReady(_ready.bucket(1)); + _bmj.recompute(); + failRetrieveForLid(5); + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_FALSE(_bmj.done()); + fixRetriever(); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); + assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); +} + TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) { diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp index f49d806b6d4..143d8f290c6 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp @@ -35,6 +35,7 @@ struct DocumentMoverTest : ::testing::Test test::UserDocumentsBuilder _builder; std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB; MyMoveOperationLimiter _limiter; + //TODO When we retire old bucket move job me must make rewrite this test to use the BucketMover directly. DocumentBucketMover _mover; MySubDbTwoBuckets _source; bucketdb::BucketDBOwner _bucketDb; @@ -71,8 +72,10 @@ TEST_F(DocumentMoverTest, require_that_initial_bucket_mover_is_done) MyMoveOperationLimiter limiter; DocumentBucketMover mover(limiter, _bucketDb); EXPECT_TRUE(mover.bucketDone()); + EXPECT_FALSE(mover.needReschedule()); mover.moveDocuments(2); EXPECT_TRUE(mover.bucketDone()); + EXPECT_FALSE(mover.needReschedule()); } TEST_F(DocumentMoverTest, require_that_we_can_move_all_documents) @@ -136,4 +139,16 @@ TEST_F(DocumentMoverTest, require_that_we_can_move_documents_in_several_steps) EXPECT_EQ(5u, _handler._moves.size()); } +TEST_F(DocumentMoverTest, require_that_cancel_signal_rescheduling_need) { + setupForBucket(_source.bucket(1), 6, 9); + EXPECT_FALSE(_mover.bucketDone()); + EXPECT_FALSE(_mover.needReschedule()); + EXPECT_TRUE(moveDocuments(2)); + EXPECT_FALSE(_mover.bucketDone()); + EXPECT_FALSE(_mover.needReschedule()); + _mover.cancel(); + EXPECT_TRUE(_mover.bucketDone()); + EXPECT_TRUE(_mover.needReschedule()); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index 84b693db4ba..cef73d2975c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -216,7 +216,7 @@ void BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) { auto [keys, done] = mover->getKeysToMove(maxDocsToMove); if (done) { - mover->setBucketDone(); + mover->setAllScheduled(); } if (keys.empty()) return; if (_stopped.load(std::memory_order_relaxed)) return; @@ -247,12 +247,20 @@ BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> op void BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) { - if (mover->bucketDone() && mover->inSync()) { + bool bucketMoveComplete = mover->allScheduled() && mover->inSync(); + if (bucketMoveComplete || mover->needReschedule()) { BucketId bucket = mover->getBucket(); - assert(_bucketsInFlight.contains(bucket)); - _modifiedHandler.notifyBucketModified(bucket); + assert(mover->needReschedule() || _bucketsInFlight.contains(bucket)); _bucketsInFlight.erase(bucket); - updatePending(); + if (mover->needReschedule()) { + reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket); + if (!_buckets2Move.contains(bucket)) { + // It failed, but all was moved anyway + _modifiedHandler.notifyBucketModified(bucket); + } + } else { + _modifiedHandler.notifyBucketModified(bucket); + } if (_postponedUntilSafe.contains(bucket)) { _postponedUntilSafe.erase(bucket); reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket); @@ -261,6 +269,7 @@ BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) { _postponedUntilSafe.erase(RECOMPUTE_TOKEN); recompute(); } + updatePending(); } } @@ -348,9 +357,9 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { const auto & mover = _movers[index]; //Move, or reduce movers as we are tailing off - if (!mover->bucketDone()) { + if (!mover->allScheduled()) { startMove(mover, maxDocsToMove); - if (mover->bucketDone()) { + if (mover->allScheduled()) { _movers.erase(_movers.begin() + index); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index 662e9e2e920..e33513df3f1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -60,7 +60,8 @@ BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB _targetSubDbId(targetSubDbId), _started(0), _completed(0), - _bucketDone(false), + _needReschedule(false), + _allScheduled(false), _lastGidValid(false), _lastGid() { } @@ -95,6 +96,7 @@ BucketMover::createMoveOperations(std::vector<MoveKey> toMove) { for (MoveKey &key : toMove) { auto moveOp = createMoveOperation(key); if (!moveOp.first) { + _needReschedule.store(true, std::memory_order_relaxed); break; } successfulReads.push_back(std::move(moveOp)); @@ -109,6 +111,12 @@ BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallba } } +void +BucketMover::cancel() { + setAllScheduled(); + _needReschedule.store(true, std::memory_order_relaxed); +} + } namespace proton { @@ -137,7 +145,7 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove) { bool DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) { - if (_impl->bucketDone()) { + if (_impl->allScheduled()) { return true; } auto [keys, done] = _impl->getKeysToMove(maxDocsToMove); @@ -145,7 +153,7 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter & auto moveOps = _impl->createMoveOperations(std::move(keys)); bool allOk = (numKeys == moveOps.size()); if (done && allOk) { - _impl->setBucketDone(); + _impl->setAllScheduled(); } if (moveOps.empty()) return allOk; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index 807151b0769..05ceb4c17e6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -82,10 +82,11 @@ public: void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone); const document::BucketId &getBucket() const { return _bucket; } - void cancel() { setBucketDone(); } - void setBucketDone() { _bucketDone = true; } + void cancel(); + void setAllScheduled() { _allScheduled = true; } /// Signals all documents have been scheduled for move - bool bucketDone() const { return _bucketDone; } + bool allScheduled() const { return _allScheduled; } + bool needReschedule() const { return _needReschedule.load(std::memory_order_relaxed); } const MaintenanceDocumentSubDB * getSource() const { return _source; } /// Must be called in master thread void updateLastValidGid(const document::GlobalId &gid) { @@ -103,7 +104,8 @@ private: std::atomic<uint32_t> _started; std::atomic<uint32_t> _completed; - bool _bucketDone; // All moves started, or operation has been cancelled + std::atomic<bool> _needReschedule; + bool _allScheduled; // All moves started, or operation has been cancelled bool _lastGidValid; document::GlobalId _lastGid; GuardedMoveOp createMoveOperation(MoveKey & key); @@ -139,8 +141,9 @@ public: const document::BucketId &getBucket() const { return _impl->getBucket(); } bool moveDocuments(size_t maxDocsToMove); void cancel() { _impl->cancel(); } + bool needReschedule() { return _impl && _impl->needReschedule(); } bool bucketDone() const { - return !_impl || _impl->bucketDone(); + return !_impl || _impl->allScheduled(); } const MaintenanceDocumentSubDB * getSource() const { return _impl->getSource(); } }; |