diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-30 23:41:41 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-30 23:41:41 +0000 |
commit | f6cb5c24a40953b6e8d130ea3b39edc93a1b0219 (patch) | |
tree | fb5fc519a203acbdb7ac6a75958643a428f80fd1 /searchcore | |
parent | 24ee2b7405410a0f89a5bbb649daa84358a4077c (diff) |
Check if the lid might block due to missing commit.
If so pill back and reschedule.
Rescheduling will give a busy loop, but that is rare and find.
Diffstat (limited to 'searchcore')
10 files changed, 67 insertions, 38 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 7126cb04892..7df8641cc30 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -188,7 +188,8 @@ MySubDb::MySubDb(const std::shared_ptr<const DocumentTypeRepo> &repo, std::share _metaStore(*_metaStoreSP), _realRetriever(std::make_shared<MyDocumentRetriever>(repo)), _retriever(_realRetriever), - _subDb("my_sub_db", subDbId, _metaStoreSP, _retriever, IFeedView::SP()), _docs(), + _subDb("my_sub_db", subDbId, _metaStoreSP, _retriever, IFeedView::SP(), nullptr), + _docs(), _bucketDBHandler(*bucketDB) { _bucketDBHandler.addDocumentMetaStore(_metaStoreSP.get(), 0); @@ -239,7 +240,8 @@ struct MoveFixture sourceSubDbId, _source._subDb.meta_store(), _source._subDb.retriever(), - _source._subDb.feed_view()); + _source._subDb.feed_view(), + nullptr); _mover.setupForBucket(bucket, &_source._subDb, targetSubDbId, _handler, _bucketDb); } void moveDocuments(size_t maxDocsToMove) { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index f98fa5e58d0..a0f4ccf4f3c 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -228,6 +228,7 @@ struct MyDocumentRetriever : public DocumentRetrieverBaseForTest { struct MySubDb { test::DummyDocumentSubDb sub_db; MaintenanceDocumentSubDB maintenance_sub_db; + PendingLidTracker _pendingLidsForCommit; MySubDb(std::shared_ptr<BucketDBOwner> bucket_db, const MyDocumentStore& store, const std::shared_ptr<const DocumentTypeRepo> & repo); ~MySubDb(); }; @@ -236,7 +237,8 @@ MySubDb::MySubDb(std::shared_ptr<BucketDBOwner> bucket_db, const MyDocumentStore : sub_db(std::move(bucket_db), SUBDB_ID), maintenance_sub_db(sub_db.getName(), sub_db.getSubDbId(), sub_db.getDocumentMetaStoreContext().getSP(), std::make_shared<MyDocumentRetriever>(repo, store), - std::make_shared<MyFeedView>(repo)) + std::make_shared<MyFeedView>(repo), + &_pendingLidsForCommit) { } diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index cd6b23e5e26..e408bdcfd2d 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -493,7 +493,7 @@ MyDocumentSubDB::getSubDB() return MaintenanceDocumentSubDB("my_sub_db", _subDBId, _metaStoreSP, retriever, - IFeedView::SP()); + IFeedView::SP(), nullptr); } diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index 17b8b9e30e0..084666f5859 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -116,7 +116,7 @@ BucketMoveJob::scanBuckets(size_t maxBucketsToScan, IFrozenBucketHandler::Exclus return ScanResult(bucketsScanned, passDone); } -void +bool BucketMoveJob::moveDocuments(DocumentBucketMover &mover, size_t maxDocsToMove, IFrozenBucketHandler::ExclusiveBucketGuard::UP & bucketGuard) @@ -125,14 +125,15 @@ BucketMoveJob::moveDocuments(DocumentBucketMover &mover, bucketGuard = _frozenBuckets.acquireExclusiveBucket(mover.getBucket()); if (! bucketGuard) { maybeDelayMover(mover, mover.getBucket()); - return; + return true; } } assert(mover.getBucket() == bucketGuard->getBucket()); - mover.moveDocuments(maxDocsToMove); + if ( ! mover.moveDocuments(maxDocsToMove)) return false; if (mover.bucketDone()) { _modifiedHandler.notifyBucketModified(mover.getBucket()); } + return true; } namespace { @@ -279,12 +280,12 @@ BucketMoveJob::changedCalculator() maybeCancelMover(_delayedMover); } -void +bool BucketMoveJob::scanAndMove(size_t maxBucketsToScan, size_t maxDocsToMove) { if (done()) { - return; + return true; } IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard; // Look for delayed bucket to be processed now @@ -297,8 +298,7 @@ BucketMoveJob::scanAndMove(size_t maxBucketsToScan, } } if (!_delayedMover.bucketDone()) { - moveDocuments(_delayedMover, maxDocsToMove, bucketGuard); - return; + return moveDocuments(_delayedMover, maxDocsToMove, bucketGuard); } if (_mover.bucketDone()) { size_t bucketsScanned = 0; @@ -324,8 +324,9 @@ BucketMoveJob::scanAndMove(size_t maxBucketsToScan, } } if (!_mover.bucketDone()) { - moveDocuments(_mover, maxDocsToMove, bucketGuard); + return moveDocuments(_mover, maxDocsToMove, bucketGuard); } + return true; } bool @@ -334,7 +335,10 @@ BucketMoveJob::run() if (isBlocked()) { return true; // indicate work is done, since node state is bad } - scanAndMove(200, 1); + /// Returning false here will immediately post the job back on the executor. This will give a busy loop, + /// but this is considered fine as it is very rare and it will be intermingled with multiple feed operations. + if ( ! scanAndMove(200, 1) ) return false; + if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { return true; } diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h index 7147613caee..015a779b923 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -113,7 +113,7 @@ private: void maybeCancelMover(DocumentBucketMover &mover); void maybeDelayMover(DocumentBucketMover &mover, document::BucketId bucket); - void + bool moveDocuments(DocumentBucketMover &mover, size_t maxDocsToMove, IFrozenBucketHandler::ExclusiveBucketGuard::UP & bucketGuard); @@ -151,10 +151,10 @@ public: const vespalib::string &docTypeName, document::BucketSpace bucketSpace); - virtual ~BucketMoveJob(); + ~BucketMoveJob() override; void changedCalculator(); - void scanAndMove(size_t maxBucketsToScan, size_t maxDocsToMove); + bool scanAndMove(size_t maxBucketsToScan, size_t maxDocsToMove); bool done() const { // Ignores _delayedBucketsFrozen, since no work can be done there yet @@ -166,22 +166,22 @@ public: } // IMaintenanceJob API - virtual bool run() override; + bool run() override; // IClusterStateChangedHandler API - virtual void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; + void notifyClusterStateChanged(const IBucketStateCalculator::SP &newCalc) override; // IBucketFreezeListener API - virtual void notifyThawedBucket(const document::BucketId &bucket) override; + void notifyThawedBucket(const document::BucketId &bucket) override; // IBucketStateChangedHandler API - virtual void notifyBucketStateChanged(const document::BucketId &bucketId, + void notifyBucketStateChanged(const document::BucketId &bucketId, storage::spi::BucketInfo::ActiveState newState) override; - virtual void notifyDiskMemUsage(DiskMemUsageState state) override; + void notifyDiskMemUsage(DiskMemUsageState state) override; // bucketdb::IBucketCreateListener API - virtual void notifyCreateBucket(const document::BucketId &bucket) override; + void notifyCreateBucket(const document::BucketId &bucket) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index 3d5c51c4346..54c1e0fb5a8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -19,14 +19,15 @@ namespace proton { typedef IDocumentMetaStore::Iterator Iterator; -void +bool DocumentBucketMover::moveDocument(DocumentIdT lid, const document::GlobalId &gid, Timestamp timestamp) { - Document::SP doc(_source->retriever()->getFullDocument(lid).release()); + if ( _source->lidNeedsCommit(lid) ) return false; + Document::SP doc(_source->retriever()->getFullDocument(lid)); if (!doc || doc->getId().getGlobalId() != gid) - return; // Failed to retrieve document, removed or changed identity + return true; // Failed to retrieve document, removed or changed identity // TODO(geirst): what if doc is NULL? BucketId bucketId = _bucket.stripUnused(); MoveOperation op(bucketId, timestamp, doc, DbDocumentId(_source->sub_db_id(), lid), _targetSubDbId); @@ -37,6 +38,7 @@ DocumentBucketMover::moveDocument(DocumentIdT lid, _bucketDb->takeGuard()->cacheBucket(bucketId); _handler->handleMove(op, _limiter.beginOperation()); _bucketDb->takeGuard()->uncacheBucket(); + return true; } @@ -93,15 +95,16 @@ public: } -void DocumentBucketMover::setBucketDone() { +void +DocumentBucketMover::setBucketDone() { _bucketDone = true; } -void +bool DocumentBucketMover::moveDocuments(size_t maxDocsToMove) { if (_bucketDone) { - return; + return true; } Iterator itr = (_lastGidValid ? _source->meta_store()->upperBound(_lastGid) : _source->meta_store()->lowerBound(_bucket)); @@ -131,8 +134,9 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove) setBucketDone(); } for (const MoveKey & key : toMove) { - moveDocument(key._lid, key._gid, key._timestamp); + if ( ! moveDocument(key._lid, key._gid, key._timestamp)) return false; } + return true; } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index 5ed2226ce65..53a2bdd2e4f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -33,7 +33,7 @@ private: document::GlobalId _lastGid; bool _lastGidValid; - void moveDocument(search::DocumentIdT lid, + bool moveDocument(search::DocumentIdT lid, const document::GlobalId &gid, storage::spi::Timestamp timestamp); @@ -46,7 +46,7 @@ public: IDocumentMoveHandler &handler, BucketDBOwner &bucketDb); const document::BucketId &getBucket() const { return _bucket; } - void moveDocuments(size_t maxDocsToMove); + bool moveDocuments(size_t maxDocsToMove); void cancel() { setBucketDone(); } bool bucketDone() const { return _bucketDone; } const MaintenanceDocumentSubDB * getSource() const { return _source; } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp index 85d9c767535..61cf45a81d7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentsubdbcollection.cpp @@ -128,7 +128,7 @@ DocumentSubDBCollection::createRetrievers() namespace { -IDocumentRetriever::SP +std::shared_ptr<CommitAndWaitDocumentRetriever> wrapRetriever(IDocumentRetriever::SP retriever, ILidCommitState & unCommittedLidsTracker) { return std::make_shared<CommitAndWaitDocumentRetriever>(std::move(retriever), unCommittedLidsTracker); @@ -163,18 +163,21 @@ void DocumentSubDBCollection::maintenanceSync(MaintenanceController &mc) { getReadySubDB()->getDocumentMetaStoreContext().getSP(), wrapRetriever((*retrievers)[_readySubDbId], getReadySubDB()->getUncommittedLidsTracker()), - getReadySubDB()->getFeedView()); + getReadySubDB()->getFeedView(), + &getReadySubDB()->getUncommittedLidsTracker()); MaintenanceDocumentSubDB remSubDB(getRemSubDB()->getName(), _remSubDbId, getRemSubDB()->getDocumentMetaStoreContext().getSP(), wrapRetriever((*retrievers)[_remSubDbId], getRemSubDB()->getUncommittedLidsTracker()), - getRemSubDB()->getFeedView()); + getRemSubDB()->getFeedView(), + &getRemSubDB()->getUncommittedLidsTracker()); MaintenanceDocumentSubDB notReadySubDB(getNotReadySubDB()->getName(), _notReadySubDbId, getNotReadySubDB()->getDocumentMetaStoreContext().getSP(), wrapRetriever((*retrievers)[_notReadySubDbId], getNotReadySubDB()->getUncommittedLidsTracker()), - getNotReadySubDB()->getFeedView()); + getNotReadySubDB()->getFeedView(), + &getNotReadySubDB()->getUncommittedLidsTracker()); mc.syncSubDBs(readySubDB, remSubDB, notReadySubDB); } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancedocumentsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancedocumentsubdb.cpp index 6b781eb8e0f..8ef859ed194 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancedocumentsubdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancedocumentsubdb.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "maintenancedocumentsubdb.h" +#include <vespa/searchcore/proton/common/ipendinglidtracker.h> namespace proton { @@ -19,12 +20,14 @@ MaintenanceDocumentSubDB::MaintenanceDocumentSubDB(const vespalib::string& name, uint32_t sub_db_id, IDocumentMetaStore::SP meta_store, IDocumentRetriever::SP retriever, - IFeedView::SP feed_view) + IFeedView::SP feed_view, + const ILidCommitState * pendingLidsForCommit) : _name(name), _sub_db_id(sub_db_id), _meta_store(std::move(meta_store)), _retriever(std::move(retriever)), - _feed_view(std::move(feed_view)) + _feed_view(std::move(feed_view)), + _pendingLidsForCommit(pendingLidsForCommit) { } @@ -38,4 +41,10 @@ MaintenanceDocumentSubDB::clear() _feed_view.reset(); } +bool +MaintenanceDocumentSubDB::lidNeedsCommit(search::DocumentIdT lid) const { + return (_pendingLidsForCommit && + (_pendingLidsForCommit->getState(lid) == ILidCommitState::State::NEED_COMMIT)); +} + } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancedocumentsubdb.h b/searchcore/src/vespa/searchcore/proton/server/maintenancedocumentsubdb.h index bfe73f361b6..5710e893503 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancedocumentsubdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancedocumentsubdb.h @@ -8,6 +8,8 @@ namespace proton { +class ILidCommitState; + /** * The view of a document sub db as seen from the maintenance controller * and various maintenance jobs. @@ -20,6 +22,7 @@ private: IDocumentMetaStore::SP _meta_store; IDocumentRetriever::SP _retriever; IFeedView::SP _feed_view; + const ILidCommitState *_pendingLidsForCommit; public: MaintenanceDocumentSubDB(); @@ -29,7 +32,8 @@ public: uint32_t sub_db_id, IDocumentMetaStore::SP meta_store, IDocumentRetriever::SP retriever, - IFeedView::SP feed_view); + IFeedView::SP feed_view, + const ILidCommitState *); const vespalib::string& name() const { return _name; } uint32_t sub_db_id() const { return _sub_db_id; } @@ -38,6 +42,7 @@ public: const IFeedView::SP& feed_view() const { return _feed_view; } bool valid() const { return _meta_store.get() != nullptr; } + bool lidNeedsCommit(search::DocumentIdT lid) const; void clear(); }; |