diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-10 12:16:00 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-10 12:27:55 +0000 |
commit | 7cde1032d8c8dcf7e8a37ea477015e5c331fb56f (patch) | |
tree | 01b127684d6913f37e76eabaaeddf3c7d47dfc86 /searchcore | |
parent | ae489bc7089f2873041fc76af82e21dad7090f2e (diff) |
Refactor to splitt out work that can be done in separate thread.
Diffstat (limited to 'searchcore')
9 files changed, 194 insertions, 126 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h index a7e568dc00f..4a6bb4fa938 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h @@ -4,13 +4,12 @@ #include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h> #include <vespa/searchcore/proton/test/bucketfactory.h> #include <vespa/searchcore/proton/feedoperation/moveoperation.h> -#include <vespa/searchcore/proton/server/bucketmovejob.h> -#include <vespa/searchcore/proton/server/documentbucketmover.h> #include <vespa/searchcore/proton/server/i_move_operation_limiter.h> #include <vespa/searchcore/proton/server/idocumentmovehandler.h> #include <vespa/searchcore/proton/server/imaintenancejobrunner.h> #include <vespa/searchcore/proton/server/maintenancedocumentsubdb.h> #include <vespa/searchcore/proton/server/ibucketmodifiedhandler.h> +#include <vespa/searchcore/proton/server/i_maintenance_job.h> #include <vespa/searchcore/proton/test/buckethandler.h> #include <vespa/searchcore/proton/test/clusterstatehandler.h> #include <vespa/searchcore/proton/test/disk_mem_usage_notifier.h> @@ -124,6 +123,14 @@ struct MySubDb { } }; +struct MyCountJobRunner : public IMaintenanceJobRunner { + uint32_t runCount; + explicit MyCountJobRunner(IMaintenanceJob &job) : runCount(0) { + job.registerRunner(this); + } + void run() override { ++runCount; } +}; + bool assertEqual(const document::BucketId &bucket, const proton::test::Document &doc, uint32_t sourceSubDbId, uint32_t targetSubDbId, const MoveOperation &op); diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 70b173ece75..56cf1ae5389 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -1,7 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketmover_common.h" -#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/searchcore/proton/server/bucketmovejob.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/log/log.h> @@ -57,14 +57,6 @@ struct MyFrozenBucketHandler : public IFrozenBucketHandler } }; -struct MyCountJobRunner : public IMaintenanceJobRunner { - uint32_t runCount; - explicit MyCountJobRunner(IMaintenanceJob &job) : runCount(0) { - job.registerRunner(this); - } - void run() override { ++runCount; } -}; - struct ControllerFixtureBase : public ::testing::Test { test::UserDocumentsBuilder _builder; @@ -152,7 +144,7 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig { } -ControllerFixtureBase::~ControllerFixtureBase() {} +ControllerFixtureBase::~ControllerFixtureBase() = default; constexpr double RESOURCE_LIMIT_FACTOR = 1.0; constexpr uint32_t MAX_OUTSTANDING_OPS = 10; const BlockableMaintenanceJobConfig BLOCKABLE_CONFIG(RESOURCE_LIMIT_FACTOR, MAX_OUTSTANDING_OPS); diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp index 565050f1052..0d978e6e463 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentmover_test.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketmover_common.h" +#include <vespa/searchcore/proton/server/documentbucketmover.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/log/log.h> diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/scaniterator_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/scaniterator_test.cpp index 75f5f7c7427..2c382473fca 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/scaniterator_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/scaniterator_test.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bucketmover_common.h" +#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/log/log.h> diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.h index a437230ed0f..5c1f224c1e7 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.h @@ -26,9 +26,7 @@ private: public: enum class Pass {FIRST, SECOND}; ScanIterator(BucketDBOwner::Guard db, Pass pass, BucketId lastBucket, BucketId endBucket); - ScanIterator(BucketDBOwner::Guard db, BucketId bucket); - ScanIterator(const ScanIterator &) = delete; ScanIterator(ScanIterator &&rhs); ScanIterator &operator=(const ScanIterator &) = delete; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index 1f2b0cf1a75..7c1b3b67f30 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -8,135 +8,138 @@ #include <vespa/searchcore/proton/feedoperation/moveoperation.h> #include <vespa/searchcore/proton/persistenceengine/i_document_retriever.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/vespalib/util/destructor_callbacks.h> using document::BucketId; using document::Document; using document::GlobalId; using storage::spi::Timestamp; -namespace proton { +namespace proton::bucketdb { typedef IDocumentMetaStore::Iterator Iterator; -bool -DocumentBucketMover::moveDocument(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp) -{ - if ( _source->lidNeedsCommit(lid) ) { - return false; +MoveOperation::UP +BucketMover::createMoveOperation(const MoveKey &key) { + if (_source->lidNeedsCommit(key._lid)) { + return {}; } - Document::SP doc(_source->retriever()->getFullDocument(lid)); - if (!doc || doc->getId().getGlobalId() != gid) - return true; // Failed to retrieve document, removed or changed identity - // TODO(geirst): what if doc is NULL? + Document::SP doc(_source->retriever()->getFullDocument(key._lid)); + if (!doc || doc->getId().getGlobalId() != key._gid) + return {}; // Failed to retrieve document, removed or changed identity BucketId bucketId = _bucket.stripUnused(); - MoveOperation op(bucketId, timestamp, doc, DbDocumentId(_source->sub_db_id(), lid), _targetSubDbId); + return std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc), + DbDocumentId(_source->sub_db_id(), key._lid), + _targetSubDbId); +} +void +BucketMover::moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone) { // We cache the bucket for the document we are going to move to avoid getting // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready // sub dbs as the bucket info is not updated atomically in this case. - _bucketDb->takeGuard()->cacheBucket(bucketId); - _handler->handleMove(op, _limiter.beginOperation()); + _bucketDb->takeGuard()->cacheBucket(moveOp->getBucketId()); + _handler->handleMove(*moveOp, std::move(onDone)); _bucketDb->takeGuard()->uncacheBucket(); - return true; } -DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter) - : _limiter(limiter), - _bucket(), - _source(nullptr), - _targetSubDbId(0), - _handler(nullptr), - _bucketDb(nullptr), - _bucketDone(true), +BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, + IDocumentMoveHandler &handler,BucketDBOwner &bucketDb) noexcept + : _source(source), + _handler(&handler), + _bucketDb(&bucketDb), + _bucket(bucket), + _targetSubDbId(targetSubDbId), + _bucketDone(false), _lastGid(), _lastGidValid(false) { } - -void -DocumentBucketMover::setupForBucket(const BucketId &bucket, - const MaintenanceDocumentSubDB *source, - uint32_t targetSubDbId, - IDocumentMoveHandler &handler, - BucketDBOwner &bucketDb) -{ - _bucket = bucket; - _source = source; - _targetSubDbId = targetSubDbId; - _handler = &handler; - _bucketDb = &bucketDb; - _bucketDone = false; - _lastGid = GlobalId(); - _lastGidValid = false; +std::pair<std::vector<BucketMover::MoveKey>, bool> +BucketMover::getKeysToMove(size_t maxDocsToMove) const { + std::pair<std::vector<BucketMover::MoveKey>, bool> result; + Iterator itr = (_lastGidValid ? _source->meta_store()->upperBound(_lastGid) + : _source->meta_store()->lowerBound(_bucket)); + const Iterator end = _source->meta_store()->upperBound(_bucket); + std::vector<MoveKey> toMove; + for (size_t docsMoved(0); itr != end && docsMoved < maxDocsToMove; ++itr) { + uint32_t lid = itr.getKey().get_lid(); + const RawDocumentMetaData &metaData = _source->meta_store()->getRawMetaData(lid); + if (metaData.getBucketUsedBits() == _bucket.getUsedBits()) { + result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp()); + ++docsMoved; + } + } + result.second = (itr == end); + return result; } - -namespace { - -class MoveKey -{ -public: - uint32_t _lid; - document::GlobalId _gid; - Timestamp _timestamp; - - MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp) - : _lid(lid), - _gid(gid), - _timestamp(timestamp) - { +std::vector<MoveOperation::UP> +BucketMover::createMoveOperations(const std::vector<MoveKey> &toMove) { + std::vector<MoveOperation::UP> successfulReads; + successfulReads.reserve(toMove.size()); + for (const MoveKey &key : toMove) { + auto moveOp = createMoveOperation(key); + if (!moveOp) { + break; + } + successfulReads.push_back(std::move(moveOp)); } -}; - + return successfulReads; } void -DocumentBucketMover::setBucketDone() { - _bucketDone = true; +BucketMover::moveDocuments(std::vector<MoveOperation::UP> moveOps, IDestructorCallbackSP onDone) { + for (auto & moveOp : moveOps) { + moveDocument(std::move(moveOp), onDone); + } } bool -DocumentBucketMover::moveDocuments(size_t maxDocsToMove) +BucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter) { if (_bucketDone) { return true; } - Iterator itr = (_lastGidValid ? _source->meta_store()->upperBound(_lastGid) - : _source->meta_store()->lowerBound(_bucket)); - const Iterator end = _source->meta_store()->upperBound(_bucket); - size_t docsMoved = 0; - size_t docsSkipped = 0; // In absence of a proper cost metric - typedef std::vector<MoveKey> MoveVec; - MoveVec toMove; - for (; itr != end && docsMoved < maxDocsToMove; ++itr) { - uint32_t lid = itr.getKey().get_lid(); - const RawDocumentMetaData &metaData = _source->meta_store()->getRawMetaData(lid); - if (metaData.getBucketUsedBits() != _bucket.getUsedBits()) { - ++docsSkipped; - if (docsSkipped >= 50) { - ++docsMoved; // In absence of a proper cost metric - docsSkipped = 0; - } - } else { - // moveDocument(lid, metaData.getTimestamp()); - toMove.push_back(MoveKey(lid, metaData.getGid(), metaData.getTimestamp())); - ++docsMoved; - } - } - bool done = (itr == end); - for (const MoveKey & key : toMove) { - if ( ! moveDocument(key._lid, key._gid, key._timestamp)) { - return false; - } - _lastGid = key._gid; - _lastGidValid = true; - } - if (done) { + auto [keys, done] = getKeysToMove(maxDocsToMove); + auto moveOps = createMoveOperations(keys); + bool allOk = keys.size() == moveOps.size(); + if (done && allOk) { setBucketDone(); } - return true; + if (moveOps.empty()) return allOk; + + updateLastValidGid(moveOps.back()->getDocument()->getId().getGlobalId()); + std::vector<IDestructorCallbackSP> opTrackers; + for (size_t i(0); i < moveOps.size(); i++) { + opTrackers.push_back(limiter.beginOperation()); + } + moveDocuments(std::move(moveOps), std::make_shared<vespalib::KeepAlive<std::vector<IDestructorCallbackSP>>>(opTrackers)); + return allOk; } +} + +namespace proton { -} // namespace proton +using bucketdb::BucketMover; + +DocumentBucketMover::DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept + : _limiter(limiter), + _impl() +{} + +void +DocumentBucketMover::setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, + uint32_t targetSubDbId, IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) +{ + _impl = std::make_unique<BucketMover>(bucket, source, targetSubDbId, handler, bucketDb); +} + +bool +DocumentBucketMover::moveDocuments(size_t maxDocsToMove) { + return !_impl || _impl->moveDocuments(maxDocsToMove, _limiter); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index 80e1811268b..6b2cda497dc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -6,48 +6,113 @@ #include <vespa/document/base/globalid.h> #include <persistence/spi/types.h> +namespace vespalib { class IDestructorCallback; } + namespace proton { class BucketDBOwner; struct IDocumentMoveHandler; struct IMoveOperationLimiter; class MaintenanceDocumentSubDB; +class MoveOperation; -/** - * Class used to move all documents in a bucket from a source sub database - * to a target sub database. The actual moving is handled by a given instance - * of IDocumentMoveHandler. - */ -class DocumentBucketMover +namespace bucketdb { + /** + * Class used to move all documents in a bucket from a source sub database + * to a target sub database. The actual moving is handled by a given instance + * of IDocumentMoveHandler. + */ +class BucketMover { +public: + using MoveOperationUP = std::unique_ptr<MoveOperation>; + using IDestructorCallback = vespalib::IDestructorCallback; + using IDestructorCallbackSP = std::shared_ptr<IDestructorCallback>; + struct MoveKey + { + using Timestamp = storage::spi::Timestamp; + MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp) noexcept + : _lid(lid), + _gid(gid), + _timestamp(timestamp) + { } + + uint32_t _lid; + document::GlobalId _gid; + Timestamp _timestamp; + }; + + BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, + IDocumentMoveHandler &handler, BucketDBOwner &bucketDb) noexcept; + BucketMover(BucketMover &&) noexcept = default; + BucketMover & operator=(BucketMover &&) noexcept = delete; + BucketMover(const BucketMover &) = delete; + BucketMover & operator=(const BucketMover &) = delete; + + // TODO remove once we have swaitched bucket move job + bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter); + + /// Must be called in master thread + std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove) const; + /// Call from any thread + std::vector<MoveOperationUP> createMoveOperations(const std::vector<MoveKey> & toMove); + /// Must be called in master thread + void moveDocuments(std::vector<MoveOperationUP> moveOps, IDestructorCallbackSP onDone); + + const document::BucketId &getBucket() const { return _bucket; } + void cancel() { setBucketDone(); } + void setBucketDone() { _bucketDone = true; } + bool bucketDone() const { return _bucketDone; } + const MaintenanceDocumentSubDB * getSource() const { return _source; } + /// Must be called in master thread + void updateLastValidGid(const document::GlobalId &gid) { + _lastGid = gid; + _lastGidValid = true; + } private: - IMoveOperationLimiter &_limiter; - document::BucketId _bucket; const MaintenanceDocumentSubDB *_source; - uint32_t _targetSubDbId; IDocumentMoveHandler *_handler; BucketDBOwner *_bucketDb; + const document::BucketId _bucket; + const uint32_t _targetSubDbId; + bool _bucketDone; document::GlobalId _lastGid; bool _lastGidValid; - bool moveDocument(uint32_t lid, - const document::GlobalId &gid, - storage::spi::Timestamp timestamp); + void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone); + MoveOperationUP createMoveOperation(const MoveKey & key); +}; +} - void setBucketDone(); +/** + * Class used to move all documents in a bucket from a source sub database + * to a target sub database. The actual moving is handled by a given instance + * of IDocumentMoveHandler. + */ +class DocumentBucketMover +{ +private: + IMoveOperationLimiter &_limiter; + std::unique_ptr<bucketdb::BucketMover> _impl; public: - DocumentBucketMover(IMoveOperationLimiter &limiter); + DocumentBucketMover(IMoveOperationLimiter &limiter) noexcept; + DocumentBucketMover(DocumentBucketMover &&) noexcept = default; + DocumentBucketMover & operator=(DocumentBucketMover &&) noexcept = delete; + DocumentBucketMover(const DocumentBucketMover &) = delete; + DocumentBucketMover & operator=(const DocumentBucketMover &) = delete; void setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, IDocumentMoveHandler &handler, BucketDBOwner &bucketDb); - const document::BucketId &getBucket() const { return _bucket; } + const document::BucketId &getBucket() const { return _impl->getBucket(); } bool moveDocuments(size_t maxDocsToMove); - void cancel() { setBucketDone(); } - bool bucketDone() const { return _bucketDone; } - const MaintenanceDocumentSubDB * getSource() const { return _source; } + void cancel() { _impl->cancel(); } + bool bucketDone() const { + return !_impl || _impl->bucketDone(); + } + const MaintenanceDocumentSubDB * getSource() const { return _impl->getSource(); } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/idocumentmovehandler.h b/searchcore/src/vespa/searchcore/proton/server/idocumentmovehandler.h index e2faaae3ac9..253b2173a81 100644 --- a/searchcore/src/vespa/searchcore/proton/server/idocumentmovehandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/idocumentmovehandler.h @@ -16,9 +16,8 @@ class MoveOperation; struct IDocumentMoveHandler { virtual void handleMove(MoveOperation &op, std::shared_ptr<vespalib::IDestructorCallback> moveDoneCtx) = 0; - virtual ~IDocumentMoveHandler() {} + virtual ~IDocumentMoveHandler() = default; }; - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp index acb81ef6976..7557f19412d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -29,7 +29,8 @@ CompactionJob::scanDocuments(const LidUsageStats &stats) if (document.valid()) { Bucket metaBucket(document::Bucket(_bucketSpace, document.bucketId)); IDestructorCallback::SP context = getLimiter().beginOperation(); - auto failed = _bucketExecutor.execute(metaBucket, makeBucketTask([this, meta=document, opsTracker=std::move(context)] (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) { + auto failed = _bucketExecutor.execute(metaBucket, makeBucketTask([this, meta=document, opsTracker=std::move(context)] + (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) { assert(bucket.getBucketId() == meta.bucketId); using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>; moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone)))); @@ -55,6 +56,7 @@ namespace { std::atomic<size_t> & _count; }; } + void CompactionJob::moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> context) { IncOnDestruct countGuard(_executedCount); |