diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-02-02 18:12:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-02 18:12:59 +0100 |
commit | 47985d0aa7aebb49abbcbbea9d81cbf4007e8f00 (patch) | |
tree | 7f3423549a38d2742244b73917107b1009d3afad /searchcore | |
parent | 5c6a38938c5c1e51ebdc1f6a7a4ce6a5a3dc5637 (diff) | |
parent | ffee6d91608f14506f1ebefef788d963718142c1 (diff) |
Merge pull request #16348 from vespa-engine/balder/refactor-bucketmove-job
Factor out the scan iterator from the BucketMoveJob.
Diffstat (limited to 'searchcore')
7 files changed, 98 insertions, 94 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index cabcd33b2dd..a952efdecdc 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -44,14 +44,8 @@ using BucketIdSet = std::set<BucketId>; using BucketIdVector = BucketId::List; using DocumentVector = std::vector<Document::SP>; using MoveOperationVector = std::vector<MoveOperation>; -using ScanItr = BucketMoveJob::ScanIterator; - -namespace { - -const uint32_t FIRST_SCAN_PASS = 1; -const uint32_t SECOND_SCAN_PASS = 2; - -} +using ScanItr = bucketdb::ScanIterator; +using ScanPass = ScanItr::Pass; struct MyMoveOperationLimiter : public IMoveOperationLimiter { uint32_t beginOpCount; @@ -346,9 +340,8 @@ struct ScanFixtureBase return ScanItr(_bucketDB->takeGuard(), BucketId()); } - ScanItr getItr(BucketId bucket, BucketId endBucket = BucketId(), uint32_t pass = FIRST_SCAN_PASS) { - return ScanItr(_bucketDB->takeGuard(), pass, - bucket, endBucket); + ScanItr getItr(BucketId bucket, BucketId endBucket = BucketId(), ScanPass pass = ScanPass::FIRST) { + return ScanItr(_bucketDB->takeGuard(), pass, bucket, endBucket); } }; @@ -451,12 +444,12 @@ TEST_F("require that we can iterate from the middle of not ready buckets", ScanF { BucketId bucket = f._notReady.bucket(2); { - ScanItr itr = f.getItr(bucket, bucket, FIRST_SCAN_PASS); + ScanItr itr = f.getItr(bucket, bucket, ScanPass::FIRST); assertEquals(BucketVector(). add(f._notReady.bucket(4)), itr, SubDbType::NOTREADY); } { - ScanItr itr = f.getItr(BucketId(), bucket, SECOND_SCAN_PASS); + ScanItr itr = f.getItr(BucketId(), bucket, ScanPass::SECOND); assertEquals(BucketVector(). add(f._notReady.bucket(2)), itr, SubDbType::NOTREADY); } @@ -478,12 +471,12 @@ TEST_F("require that we can iterate from the middle of ready buckets", ScanFixtu add(f._notReady.bucket(4)), itr, SubDbType::NOTREADY); } { - ScanItr itr = f.getItr(bucket, bucket, FIRST_SCAN_PASS); + ScanItr itr = f.getItr(bucket, bucket, ScanPass::FIRST); assertEquals(BucketVector(). add(f._ready.bucket(8)), itr, SubDbType::READY); } { - ScanItr itr = f.getItr(BucketId(), bucket, SECOND_SCAN_PASS); + ScanItr itr = f.getItr(BucketId(), bucket, ScanPass::SECOND); assertEquals(BucketVector(). add(f._ready.bucket(6)), itr, SubDbType::READY); } diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt index 6619f9e419d..66a58ad66a3 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/CMakeLists.txt @@ -6,6 +6,7 @@ vespa_add_library(searchcore_bucketdb STATIC bucket_db_owner.cpp bucketdb.cpp bucketdbhandler.cpp + bucketscaniterator.cpp bucketsessionbase.cpp bucketstate.cpp checksumaggregators.cpp diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h index 05388931e20..e6848d095df 100644 --- a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketdb.h @@ -31,7 +31,7 @@ private: void checkEmpty() const; public: BucketDB(); - virtual ~BucketDB(); + ~BucketDB(); const BucketState & add(const GlobalId &gid, const BucketId &bucketId, const Timestamp ×tamp, uint32_t docSize, diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.cpp b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.cpp new file mode 100644 index 00000000000..46cbb4fc37f --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.cpp @@ -0,0 +1,29 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucketscaniterator.h" + +using document::BucketId; +using storage::spi::BucketInfo; + +namespace proton::bucketdb { + +ScanIterator::ScanIterator(BucketDBOwner::Guard db, Pass pass, BucketId lastBucket, BucketId endBucket) + : _db(std::move(db)), + _itr(lastBucket.isSet() ? _db->upperBound(lastBucket) : _db->begin()), + _end(pass == Pass::SECOND && endBucket.isSet() ? + _db->upperBound(endBucket) : _db->end()) +{ } + +ScanIterator::ScanIterator(BucketDBOwner::Guard db, BucketId bucket) + : _db(std::move(db)), + _itr(_db->lowerBound(bucket)), + _end(_db->end()) +{ } + +ScanIterator::ScanIterator(ScanIterator &&rhs) + : _db(std::move(rhs._db)), + _itr(rhs._itr), + _end(rhs._end) +{ } + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.h b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.h new file mode 100644 index 00000000000..a437230ed0f --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/bucketdb/bucketscaniterator.h @@ -0,0 +1,49 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bucket_db_owner.h" +#include "bucketdb.h" + +namespace proton::bucketdb { + +struct ScanPosition { + document::BucketId _lastBucket; + + ScanPosition() : _lastBucket() { } + bool validBucket() const { return _lastBucket.isSet(); } +}; + + +class ScanIterator { +private: + using BucketId = document::BucketId; + using BucketIterator = BucketDB::ConstMapIterator; + BucketDBOwner::Guard _db; + BucketIterator _itr; + BucketIterator _end; + +public: + enum class Pass {FIRST, SECOND}; + ScanIterator(BucketDBOwner::Guard db, Pass pass, BucketId lastBucket, BucketId endBucket); + + ScanIterator(BucketDBOwner::Guard db, BucketId bucket); + + ScanIterator(const ScanIterator &) = delete; + ScanIterator(ScanIterator &&rhs); + ScanIterator &operator=(const ScanIterator &) = delete; + ScanIterator &operator=(ScanIterator &&rhs) = delete; + + bool valid() const { return _itr != _end; } + bool isActive() const { return _itr->second.isActive(); } + BucketId getBucket() const { return _itr->first; } + bool hasReadyBucketDocs() const { return _itr->second.getReadyCount() != 0; } + bool hasNotReadyBucketDocs() const { return _itr->second.getNotReadyCount() != 0; } + + ScanIterator & operator++() { + ++_itr; + return *this; + } +}; + +} diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index cf6ea7f7787..1c642205c86 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -21,37 +21,10 @@ namespace proton { namespace { -const uint32_t FIRST_SCAN_PASS = 1; -const uint32_t SECOND_SCAN_PASS = 2; - const char * bool2str(bool v) { return (v ? "T" : "F"); } } -BucketMoveJob::ScanIterator:: -ScanIterator(BucketDBOwner::Guard db, uint32_t pass, BucketId lastBucket, BucketId endBucket) - : _db(std::move(db)), - _itr(lastBucket.isSet() ? _db->upperBound(lastBucket) : _db->begin()), - _end(pass == SECOND_SCAN_PASS && endBucket.isSet() ? - _db->upperBound(endBucket) : _db->end()) -{ -} - -BucketMoveJob::ScanIterator:: -ScanIterator(BucketDBOwner::Guard db, BucketId bucket) - : _db(std::move(db)), - _itr(_db->lowerBound(bucket)), - _end(_db->end()) -{ -} - -BucketMoveJob::ScanIterator::ScanIterator(ScanIterator &&rhs) - : _db(std::move(rhs._db)), - _itr(rhs._itr), - _end(rhs._end) -{ -} - void BucketMoveJob::checkBucket(const BucketId &bucket, ScanIterator &itr, @@ -181,7 +154,7 @@ BucketMoveJob(const IBucketStateCalculator::SP &calc, _mover(getLimiter()), _doneScan(false), _scanPos(), - _scanPass(FIRST_SCAN_PASS), + _scanPass(ScanPass::FIRST), _endPos(), _bucketSpace(bucketSpace), _delayedBuckets(), @@ -279,7 +252,7 @@ BucketMoveJob::changedCalculator() _endPos = _scanPos; } _doneScan = false; - _scanPass = FIRST_SCAN_PASS; + _scanPass = ScanPass::FIRST; maybeCancelMover(_mover); maybeCancelMover(_delayedMover); } @@ -287,9 +260,6 @@ BucketMoveJob::changedCalculator() bool BucketMoveJob::scanAndMove(size_t maxBucketsToScan, size_t maxDocsToMove) { - if (done()) { - return true; - } IFrozenBucketHandler::ExclusiveBucketGuard::UP bucketGuard; // Look for delayed bucket to be processed now while (!_delayedBuckets.empty() && _delayedMover.bucketDone()) { @@ -310,10 +280,10 @@ BucketMoveJob::scanAndMove(size_t maxBucketsToScan, size_t maxDocsToMove) ScanResult res = scanBuckets(maxBucketsToScan - bucketsScanned, bucketGuard); bucketsScanned += res.first; if (res.second) { - if (_scanPass == FIRST_SCAN_PASS && + if (_scanPass == ScanPass::FIRST && _endPos.validBucket()) { _scanPos = ScanPosition(); - _scanPass = SECOND_SCAN_PASS; + _scanPass = ScanPass::SECOND; } else { _doneScan = true; break; @@ -334,7 +304,7 @@ BucketMoveJob::scanAndMove(size_t maxBucketsToScan, size_t maxDocsToMove) bool BucketMoveJob::run() { - if (isBlocked()) { + if (isBlocked() || done()) { return true; // indicate work is done, since node state is bad } /// Returning false here will immediately post the job back on the executor. This will give a busy loop, diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h index 26755eca7b1..8a84a10199c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -9,7 +9,7 @@ #include "ibucketstatechangedhandler.h" #include "iclusterstatechangedhandler.h" #include "ifrozenbuckethandler.h" -#include <vespa/searchcore/proton/bucketdb/bucket_db_owner.h> +#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h> #include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> #include <set> @@ -34,48 +34,10 @@ class BucketMoveJob : public BlockableMaintenanceJob, public IBucketStateChangedHandler, public IDiskMemUsageListener { -public: - struct ScanPosition { - document::BucketId _lastBucket; - - ScanPosition() : _lastBucket() { } - bool validBucket() const { return _lastBucket.isSet(); } - }; - - typedef BucketDB::ConstMapIterator BucketIterator; - - class ScanIterator { - private: - BucketDBOwner::Guard _db; - BucketIterator _itr; - BucketIterator _end; - - public: - ScanIterator(BucketDBOwner::Guard db, - uint32_t pass, - document::BucketId lastBucket, - document::BucketId endBucket); - - ScanIterator(BucketDBOwner::Guard db, document::BucketId bucket); - - ScanIterator(const ScanIterator &) = delete; - ScanIterator(ScanIterator &&rhs); - ScanIterator &operator=(const ScanIterator &) = delete; - ScanIterator &operator=(ScanIterator &&rhs) = delete; - - bool valid() const { return _itr != _end; } - bool isActive() const { return _itr->second.isActive(); } - document::BucketId getBucket() const { return _itr->first; } - bool hasReadyBucketDocs() const { return _itr->second.getReadyCount() != 0; } - bool hasNotReadyBucketDocs() const { return _itr->second.getNotReadyCount() != 0; } - - ScanIterator & operator++() { - ++_itr; - return *this; - } - }; - private: + using ScanPosition = bucketdb::ScanPosition; + using ScanIterator = bucketdb::ScanIterator; + using ScanPass = ScanIterator::Pass; using ScanResult = std::pair<size_t, bool>; std::shared_ptr<IBucketStateCalculator> _calc; IDocumentMoveHandler &_moveHandler; @@ -85,7 +47,7 @@ private: DocumentBucketMover _mover; bool _doneScan; ScanPosition _scanPos; - uint32_t _scanPass; + ScanPass _scanPass; ScanPosition _endPos; document::BucketSpace _bucketSpace; |