diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-03-05 22:41:14 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-05 22:41:14 +0100 |
commit | f8adddfb8b1494f5b303279d6bedac2252f42068 (patch) | |
tree | 3607136cb7d36f390a962d2d0080fe29c7fb749f /searchcore/src | |
parent | 1e0331fad1ca9237f31564d5e716354cf676f70a (diff) |
Revert "Revert "If a document was removed or moved between th emove started and the d…""
Diffstat (limited to 'searchcore/src')
6 files changed, 93 insertions, 17 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h index e955f017d67..f70a4bfad11 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h @@ -64,8 +64,13 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest { using DocumentVector = std::vector<Document::SP>; std::shared_ptr<const DocumentTypeRepo> _repo; DocumentVector _docs; + uint32_t _lid2Fail; - MyDocumentRetriever(std::shared_ptr<const DocumentTypeRepo> repo) : _repo(std::move(repo)), _docs() { + MyDocumentRetriever(std::shared_ptr<const DocumentTypeRepo> repo) + : _repo(std::move(repo)), + _docs(), + _lid2Fail(0) + { _docs.push_back(Document::UP()); // lid 0 invalid } @@ -76,9 +81,11 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest { DocumentMetaData getDocumentMetaData(const DocumentId &) const override { return DocumentMetaData(); } Document::UP getFullDocument(DocumentIdT lid) const override { - return Document::UP(_docs[lid]->clone()); + return (lid != _lid2Fail) ? Document::UP(_docs[lid]->clone()) : Document::UP(); } + void failRetrieveForLid(uint32_t lid) { _lid2Fail = lid; } + CachedSelect::SP parseSelect(const vespalib::string &) const override { return {}; } @@ -115,6 +122,10 @@ struct MySubDb { void insertDocs(const UserDocuments &docs_); + void failRetrieveForLid(uint32_t lid) { + _realRetriever->failRetrieveForLid(lid); + } + BucketId bucket(uint32_t userId) const { return _docs.getBucket(userId); } diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp index 99692ec53bd..d3f4b2fbedd 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp @@ -70,6 +70,14 @@ struct ControllerFixtureBase : public ::testing::Test _bucketHandler.notifyBucketStateChanged(bucket, BucketInfo::ActiveState::NOT_ACTIVE); return *this; } + void failRetrieveForLid(uint32_t lid) { + _ready.failRetrieveForLid(lid); + _notReady.failRetrieveForLid(lid); + } + void fixRetriever() { + _ready.failRetrieveForLid(0); + _notReady.failRetrieveForLid(0); + } const MoveOperationVector &docsMoved() const { return _moveHandler._moves; } @@ -200,6 +208,28 @@ TEST_F(ControllerFixture, require_that_ready_bucket_is_moved_to_not_ready_if_buc EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); } +TEST_F(ControllerFixture, require_that_bucket_is_moved_even_with_error) +{ + // bucket 2 should be moved + addReady(_ready.bucket(1)); + _bmj.recompute(); + failRetrieveForLid(5); + EXPECT_FALSE(_bmj.done()); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_FALSE(_bmj.done()); + fixRetriever(); + EXPECT_TRUE(_bmj.scanAndMove(4, 3)); + EXPECT_TRUE(_bmj.done()); + sync(); + EXPECT_EQ(2u, docsMoved().size()); + assertEqual(_ready.bucket(2), _ready.docs(2)[0], 1, 2, docsMoved()[0]); + assertEqual(_ready.bucket(2), _ready.docs(2)[1], 1, 2, docsMoved()[1]); + EXPECT_EQ(1u, bucketsModified().size()); + EXPECT_EQ(_ready.bucket(2), bucketsModified()[0]); +} + TEST_F(ControllerFixture, require_that_we_move_buckets_in_several_steps) { diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp index f49d806b6d4..143d8f290c6 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp @@ -35,6 +35,7 @@ struct DocumentMoverTest : ::testing::Test test::UserDocumentsBuilder _builder; std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB; MyMoveOperationLimiter _limiter; + //TODO When we retire old bucket move job me must make rewrite this test to use the BucketMover directly. DocumentBucketMover _mover; MySubDbTwoBuckets _source; bucketdb::BucketDBOwner _bucketDb; @@ -71,8 +72,10 @@ TEST_F(DocumentMoverTest, require_that_initial_bucket_mover_is_done) MyMoveOperationLimiter limiter; DocumentBucketMover mover(limiter, _bucketDb); EXPECT_TRUE(mover.bucketDone()); + EXPECT_FALSE(mover.needReschedule()); mover.moveDocuments(2); EXPECT_TRUE(mover.bucketDone()); + EXPECT_FALSE(mover.needReschedule()); } TEST_F(DocumentMoverTest, require_that_we_can_move_all_documents) @@ -136,4 +139,16 @@ TEST_F(DocumentMoverTest, require_that_we_can_move_documents_in_several_steps) EXPECT_EQ(5u, _handler._moves.size()); } +TEST_F(DocumentMoverTest, require_that_cancel_signal_rescheduling_need) { + setupForBucket(_source.bucket(1), 6, 9); + EXPECT_FALSE(_mover.bucketDone()); + EXPECT_FALSE(_mover.needReschedule()); + EXPECT_TRUE(moveDocuments(2)); + EXPECT_FALSE(_mover.bucketDone()); + EXPECT_FALSE(_mover.needReschedule()); + _mover.cancel(); + EXPECT_TRUE(_mover.bucketDone()); + EXPECT_TRUE(_mover.needReschedule()); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index 84b693db4ba..cef73d2975c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -216,7 +216,7 @@ void BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) { auto [keys, done] = mover->getKeysToMove(maxDocsToMove); if (done) { - mover->setBucketDone(); + mover->setAllScheduled(); } if (keys.empty()) return; if (_stopped.load(std::memory_order_relaxed)) return; @@ -247,12 +247,20 @@ BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> op void BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) { - if (mover->bucketDone() && mover->inSync()) { + bool bucketMoveComplete = mover->allScheduled() && mover->inSync(); + if (bucketMoveComplete || mover->needReschedule()) { BucketId bucket = mover->getBucket(); - assert(_bucketsInFlight.contains(bucket)); - _modifiedHandler.notifyBucketModified(bucket); + assert(mover->needReschedule() || _bucketsInFlight.contains(bucket)); _bucketsInFlight.erase(bucket); - updatePending(); + if (mover->needReschedule()) { + reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket); + if (!_buckets2Move.contains(bucket)) { + // It failed, but all was moved anyway + _modifiedHandler.notifyBucketModified(bucket); + } + } else { + _modifiedHandler.notifyBucketModified(bucket); + } if (_postponedUntilSafe.contains(bucket)) { _postponedUntilSafe.erase(bucket); reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket); @@ -261,6 +269,7 @@ BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) { _postponedUntilSafe.erase(RECOMPUTE_TOKEN); recompute(); } + updatePending(); } } @@ -348,9 +357,9 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { const auto & mover = _movers[index]; //Move, or reduce movers as we are tailing off - if (!mover->bucketDone()) { + if (!mover->allScheduled()) { startMove(mover, maxDocsToMove); - if (mover->bucketDone()) { + if (mover->allScheduled()) { _movers.erase(_movers.begin() + index); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index 662e9e2e920..e33513df3f1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -60,7 +60,8 @@ BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB _targetSubDbId(targetSubDbId), _started(0), _completed(0), - _bucketDone(false), + _needReschedule(false), + _allScheduled(false), _lastGidValid(false), _lastGid() { } @@ -95,6 +96,7 @@ BucketMover::createMoveOperations(std::vector<MoveKey> toMove) { for (MoveKey &key : toMove) { auto moveOp = createMoveOperation(key); if (!moveOp.first) { + _needReschedule.store(true, std::memory_order_relaxed); break; } successfulReads.push_back(std::move(moveOp)); @@ -109,6 +111,12 @@ BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallba } } +void +BucketMover::cancel() { + setAllScheduled(); + _needReschedule.store(true, std::memory_order_relaxed); +} + } namespace proton { @@ -137,7 +145,7 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove) { bool DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) { - if (_impl->bucketDone()) { + if (_impl->allScheduled()) { return true; } auto [keys, done] = _impl->getKeysToMove(maxDocsToMove); @@ -145,7 +153,7 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter & auto moveOps = _impl->createMoveOperations(std::move(keys)); bool allOk = (numKeys == moveOps.size()); if (done && allOk) { - _impl->setBucketDone(); + _impl->setAllScheduled(); } if (moveOps.empty()) return allOk; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index 807151b0769..05ceb4c17e6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -82,10 +82,11 @@ public: void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone); const document::BucketId &getBucket() const { return _bucket; } - void cancel() { setBucketDone(); } - void setBucketDone() { _bucketDone = true; } + void cancel(); + void setAllScheduled() { _allScheduled = true; } /// Signals all documents have been scheduled for move - bool bucketDone() const { return _bucketDone; } + bool allScheduled() const { return _allScheduled; } + bool needReschedule() const { return _needReschedule.load(std::memory_order_relaxed); } const MaintenanceDocumentSubDB * getSource() const { return _source; } /// Must be called in master thread void updateLastValidGid(const document::GlobalId &gid) { @@ -103,7 +104,8 @@ private: std::atomic<uint32_t> _started; std::atomic<uint32_t> _completed; - bool _bucketDone; // All moves started, or operation has been cancelled + std::atomic<bool> _needReschedule; + bool _allScheduled; // All moves started, or operation has been cancelled bool _lastGidValid; document::GlobalId _lastGid; GuardedMoveOp createMoveOperation(MoveKey & key); @@ -139,8 +141,9 @@ public: const document::BucketId &getBucket() const { return _impl->getBucket(); } bool moveDocuments(size_t maxDocsToMove); void cancel() { _impl->cancel(); } + bool needReschedule() { return _impl && _impl->needReschedule(); } bool bucketDone() const { - return !_impl || _impl->bucketDone(); + return !_impl || _impl->allScheduled(); } const MaintenanceDocumentSubDB * getSource() const { return _impl->getSource(); } }; |