summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-04-20 21:50:26 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-04-20 21:50:26 +0000
commite8cea9f9201406e0af47e3e938d43f489a9bf10f (patch)
treeb1b93356e46f7dcbc110ed98a12eaea19364f767 /searchcore
parent0e05bdd705bb6fca9c5c69bc1c975f7c88416a07 (diff)
- Bring the BucketMover in along with the move keys and the move operations to ensure proper lifetime.
It must outlive the keys and th emove operations to avoid refering random memory or trigger asserts. - Enforce the BucketMover to be a shared_ptr.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp55
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp61
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h55
4 files changed, 119 insertions, 60 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index ef536639635..6cd925d1b3d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -151,19 +151,16 @@ BucketMoveJobV2::needMove(const ScanIterator &itr) const {
class BucketMoveJobV2::StartMove : public storage::spi::BucketTask {
public:
using IDestructorCallbackSP = std::shared_ptr<vespalib::IDestructorCallback>;
- StartMove(std::shared_ptr<BucketMoveJobV2> job, std::shared_ptr<BucketMover> mover,
- std::vector<BucketMover::MoveKey> keys,
- IDestructorCallbackSP opsTracker)
+ StartMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker)
: _job(job),
- _mover(std::move(mover)),
_keys(std::move(keys)),
_opsTracker(std::move(opsTracker))
{}
void run(const Bucket &bucket, IDestructorCallbackSP onDone) override {
- assert(_mover->getBucket() == bucket.getBucketId());
+ assert(_keys.mover().getBucket() == bucket.getBucketId());
using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>;
- BucketMoveJobV2::prepareMove(std::move(_job), std::move(_mover), std::move(_keys),
+ BucketMoveJobV2::prepareMove(std::move(_job), std::move(_keys),
std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
}
@@ -172,10 +169,9 @@ public:
}
private:
- std::shared_ptr<BucketMoveJobV2> _job;
- std::shared_ptr<BucketMover> _mover;
- std::vector<BucketMover::MoveKey> _keys;
- IDestructorCallbackSP _opsTracker;
+ std::shared_ptr<BucketMoveJobV2> _job;
+ BucketMover::MoveKeys _keys;
+ IDestructorCallbackSP _opsTracker;
};
void
@@ -189,37 +185,38 @@ BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bu
}
void
-BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
- auto [keys, done] = mover->getKeysToMove(maxDocsToMove);
+BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) {
+ auto [keys, done] = mover.getKeysToMove(maxDocsToMove);
if (done) {
- mover->setAllScheduled();
+ mover.setAllScheduled();
}
if (keys.empty()) return;
- mover->updateLastValidGid(keys.back()._gid);
- Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket()));
- auto bucketTask = std::make_unique<StartMove>(shared_from_this(), std::move(mover), std::move(keys), getLimiter().beginOperation());
+ mover.updateLastValidGid(keys.back()._gid);
+ Bucket spiBucket(document::Bucket(_bucketSpace, mover.getBucket()));
+ auto bucketTask = std::make_unique<StartMove>(shared_from_this(), std::move(keys), getLimiter().beginOperation());
_bucketExecutor.execute(spiBucket, std::move(bucketTask));
}
void
-BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone)
+BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone)
{
if (job->_stopped) return; //TODO Remove once lidtracker is no longer in use.
- auto moveOps = mover->createMoveOperations(std::move(keys));
+ auto moveOps = keys.createMoveOperations();
auto & master = job->_master;
if (job->_stopped) return;
- master.execute(makeLambdaTask([job=std::move(job), mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable {
+ master.execute(makeLambdaTask([job=std::move(job), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable {
if (job->_stopped.load(std::memory_order_relaxed)) return;
- job->completeMove(std::move(mover), std::move(moveOps), std::move(onDone));
+ job->completeMove(std::move(moveOps), std::move(onDone));
}));
}
void
-BucketMoveJobV2::completeMove(BucketMoverSP mover, GuardedMoveOps ops, IDestructorCallbackSP onDone) {
- mover->moveDocuments(std::move(ops.success), std::move(onDone));
- ops.failed.clear();
- if (checkIfMoverComplete(*mover)) {
- reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), mover->getBucket());
+BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) {
+ BucketMover & mover = ops.mover();
+ mover.moveDocuments(std::move(ops.success()), std::move(onDone));
+ ops.failed().clear();
+ if (checkIfMoverComplete(mover)) {
+ reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), mover.getBucket());
}
}
@@ -306,7 +303,7 @@ BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) {
const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady);
LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)",
bucket.toString().c_str(), source.sub_db_id(), target.sub_db_id());
- return std::make_shared<BucketMover>(bucket, &source, target.sub_db_id(), _moveHandler);
+ return BucketMover::create(bucket, &source, target.sub_db_id(), _moveHandler);
}
std::shared_ptr<BucketMover>
@@ -327,12 +324,12 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
// Select mover
size_t index = _iterateCount++ % _movers.size();
- const auto & mover = _movers[index];
+ auto & mover = *_movers[index];
//Move, or reduce movers as we are tailing off
- if (!mover->allScheduled()) {
+ if (!mover.allScheduled()) {
startMove(mover, maxDocsToMove);
- if (mover->allScheduled()) {
+ if (mover.allScheduled()) {
_movers.erase(_movers.begin() + index);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index 40cbc98adf7..736ac3a8f40 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -55,7 +55,6 @@ private:
using BucketMoverSP = std::shared_ptr<BucketMover>;
using Bucket2Mover = std::map<BucketId, BucketMoverSP>;
using Movers = std::vector<BucketMoverSP>;
- using MoveKey = BucketMover::MoveKey;
using GuardedMoveOps = BucketMover::GuardedMoveOps;
std::shared_ptr<IBucketStateCalculator> _calc;
IDocumentMoveHandler &_moveHandler;
@@ -93,10 +92,9 @@ private:
const vespalib::string &docTypeName,
document::BucketSpace bucketSpace);
- void startMove(BucketMoverSP mover, size_t maxDocsToMove);
- static void prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMoverSP mover,
- std::vector<MoveKey> keysToMove, IDestructorCallbackSP context);
- void completeMove(BucketMoverSP mover, GuardedMoveOps moveOps, IDestructorCallbackSP context);
+ void startMove(BucketMover & mover, size_t maxDocsToMove);
+ static void prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP context);
+ void completeMove(GuardedMoveOps moveOps, IDestructorCallbackSP context);
bool checkIfMoverComplete(const BucketMover & mover);
void considerBucket(const bucketdb::Guard & guard, BucketId bucket);
void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket);
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index f08cd4d7ab7..d0581a5b13f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -52,6 +52,26 @@ BucketMover::MoveKey::MoveKey(uint32_t lid, const document::GlobalId &gid, Times
BucketMover::MoveKey::~MoveKey() = default;
+BucketMover::MoveKeys::~MoveKeys() = default;
+
+std::shared_ptr<BucketMover>
+BucketMover::MoveKeys::stealMover() {
+ return std::move(_mover);
+}
+
+BucketMover::GuardedMoveOps
+BucketMover::MoveKeys::createMoveOperations() {
+ auto & mover = *_mover;
+ return mover.createMoveOperations(std::move(*this));
+}
+
+BucketMover::GuardedMoveOps::GuardedMoveOps(std::shared_ptr<BucketMover> mover) noexcept
+ : _mover(std::move(mover)),
+ _success(),
+ _failed()
+{}
+BucketMover::GuardedMoveOps::~GuardedMoveOps() = default;
+
BucketMover::BucketMover(const BucketId &bucket, const MaintenanceDocumentSubDB *source,
uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept
: _source(source),
@@ -70,9 +90,9 @@ BucketMover::~BucketMover() {
assert(inSync());
}
-std::pair<std::vector<BucketMover::MoveKey>, bool>
+std::pair<BucketMover::MoveKeys, bool>
BucketMover::getKeysToMove(size_t maxDocsToMove) {
- std::pair<std::vector<BucketMover::MoveKey>, bool> result;
+ std::pair<MoveKeys, bool> result(MoveKeys(shared_from_this()), false);
Iterator itr = (_lastGidValid ? _source->meta_store()->upperBound(_lastGid)
: _source->meta_store()->lowerBound(_bucket));
const Iterator end = _source->meta_store()->upperBound(_bucket);
@@ -81,7 +101,7 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) {
uint32_t lid = itr.getKey().get_lid();
const RawDocumentMetaData &metaData = _source->meta_store()->getRawMetaData(lid);
if (metaData.getBucketUsedBits() == _bucket.getUsedBits()) {
- result.first.emplace_back(lid, metaData.getGid(), metaData.getTimestamp(), MoveGuard(*this));
+ result.first.keys().emplace_back(lid, metaData.getGid(), metaData.getTimestamp(), MoveGuard(*this));
++docsMoved;
}
}
@@ -90,22 +110,22 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) {
}
BucketMover::GuardedMoveOps
-BucketMover::createMoveOperations(std::vector<MoveKey> toMove) {
- GuardedMoveOps moveOps;
- moveOps.success.reserve(toMove.size());
- for (MoveKey &key : toMove) {
- if (moveOps.failed.empty()) {
+BucketMover::createMoveOperations(MoveKeys toMove) {
+ GuardedMoveOps moveOps(toMove.stealMover());
+ moveOps.success().reserve(toMove.size());
+ for (MoveKey &key : toMove.keys()) {
+ if (moveOps.failed().empty()) {
auto moveOp = createMoveOperation(key);
if (moveOp) {
- moveOps.success.emplace_back(std::move(moveOp), std::move(key._guard));
+ moveOps.success().emplace_back(std::move(moveOp), std::move(key._guard));
} else {
- moveOps.failed.push_back(std::move(key._guard));
+ moveOps.failed().push_back(std::move(key._guard));
}
} else {
- moveOps.failed.push_back(std::move(key._guard));
+ moveOps.failed().push_back(std::move(key._guard));
}
}
- if ( ! moveOps.failed.empty()) {
+ if ( ! moveOps.failed().empty()) {
_needReschedule.store(true, std::memory_order_relaxed);
}
return moveOps;
@@ -141,7 +161,7 @@ void
DocumentBucketMover::setupForBucket(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
uint32_t targetSubDbId, IDocumentMoveHandler &handler)
{
- _impl = std::make_unique<BucketMover>(bucket, source, targetSubDbId, handler);
+ _impl = BucketMover::create(bucket, source, targetSubDbId, handler);
}
bool
@@ -157,15 +177,15 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &
}
auto [keys, done] = _impl->getKeysToMove(maxDocsToMove);
auto moveOps = _impl->createMoveOperations(std::move(keys));
- bool allOk = moveOps.failed.empty();
+ bool allOk = moveOps.failed().empty();
if (done && allOk) {
_impl->setAllScheduled();
}
- if (moveOps.success.empty()) return allOk;
+ if (moveOps.success().empty()) return allOk;
- _impl->updateLastValidGid(moveOps.success.back().first->getDocument()->getId().getGlobalId());
+ _impl->updateLastValidGid(moveOps.success().back().first->getDocument()->getId().getGlobalId());
- for (auto & moveOp : moveOps.success) {
+ for (auto & moveOp : moveOps.success()) {
// We cache the bucket for the document we are going to move to avoid getting
// inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready
// sub dbs as the bucket info is not updated atomically in this case.
@@ -176,4 +196,11 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &
return allOk;
}
+std::shared_ptr<BucketMover>
+BucketMover::create(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler)
+{
+ return std::shared_ptr<BucketMover>(new BucketMover(bucket, source, targetSubDbId, handler));
+}
+
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
index fc7760a4dc4..d150ebf7436 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
@@ -25,7 +25,7 @@ class BucketDBOwner;
* to a target sub database. The actual moving is handled by a given instance
* of IDocumentMoveHandler.
*/
-class BucketMover
+class BucketMover : public std::enable_shared_from_this<BucketMover>
{
public:
using MoveOperationUP = std::unique_ptr<MoveOperation>;
@@ -65,13 +65,48 @@ public:
};
using GuardedMoveOp = std::pair<MoveOperationUP, MoveGuard>;
- struct GuardedMoveOps {
- std::vector<GuardedMoveOp> success;
- std::vector<MoveGuard> failed;
+ class GuardedMoveOps {
+ public:
+ GuardedMoveOps(std::shared_ptr<BucketMover> mover) noexcept;
+ GuardedMoveOps(GuardedMoveOps &&) = default;
+ GuardedMoveOps & operator =(GuardedMoveOps &&) = default;
+ GuardedMoveOps(const GuardedMoveOps &) = delete;
+ GuardedMoveOps & operator = (const GuardedMoveOps &) = delete;
+ ~GuardedMoveOps();
+ std::vector<GuardedMoveOp> & success() { return _success; }
+ std::vector<MoveGuard> & failed() { return _failed; }
+ BucketMover & mover() { return *_mover; }
+ private:
+ // It is important to keep the order so the mover is destructed last
+ std::shared_ptr<BucketMover> _mover;
+ std::vector<GuardedMoveOp> _success;
+ std::vector<MoveGuard> _failed;
};
- BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
- uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept;
+ class MoveKeys {
+ public:
+ MoveKeys(std::shared_ptr<BucketMover> mover) noexcept : _mover(std::move(mover)) {}
+ MoveKeys(MoveKeys &&) noexcept = default;
+ MoveKeys & operator =(MoveKeys &&) noexcept = default;
+ MoveKeys(const MoveKeys &) noexcept = delete;
+ MoveKeys & operator =(const MoveKeys &) noexcept = delete;
+ ~MoveKeys();
+ GuardedMoveOps createMoveOperations();
+ std::shared_ptr<BucketMover> stealMover();
+ std::vector<MoveKey> & keys() { return _keys; }
+ size_t size() const { return _keys.size(); }
+ bool empty() const { return _keys.empty(); }
+ const MoveKey & back() const { return _keys.back(); }
+ const BucketMover & mover() const { return *_mover; }
+ private:
+ // It is important to keep the order so the mover is destructed last
+ std::shared_ptr<BucketMover> _mover;
+ std::vector<MoveKey> _keys;
+ };
+
+ static std::shared_ptr<BucketMover>
+ create(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler);
BucketMover(BucketMover &&) noexcept = delete;
BucketMover & operator=(BucketMover &&) noexcept = delete;
BucketMover(const BucketMover &) = delete;
@@ -79,9 +114,9 @@ public:
~BucketMover();
/// Must be called in master thread
- std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove);
+ std::pair<MoveKeys, bool> getKeysToMove(size_t maxDocsToMove);
/// Call from any thread
- GuardedMoveOps createMoveOperations(std::vector<MoveKey> toMove);
+ GuardedMoveOps createMoveOperations(MoveKeys toMove);
/// Must be called in master thread
void moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone);
void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone);
@@ -102,6 +137,8 @@ public:
return pending() == 0;
}
private:
+ BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
+ uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept;
const MaintenanceDocumentSubDB *_source;
IDocumentMoveHandler *_handler;
const document::BucketId _bucket;
@@ -130,7 +167,7 @@ class DocumentBucketMover
private:
IMoveOperationLimiter &_limiter;
bucketdb::BucketDBOwner *_bucketDb;
- std::unique_ptr<bucketdb::BucketMover> _impl;
+ std::shared_ptr<bucketdb::BucketMover> _impl;
bool moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &limiter);
public: