summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-03-06 13:40:02 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-03-06 13:40:02 +0000
commit923ab4188142075aca66352cb8f7a5bf136606a9 (patch)
tree3e2e81d24ff27a4c0cbd62cbf2165fdf9b7b2573 /searchcore
parentf8adddfb8b1494f5b303279d6bedac2252f42068 (diff)
- Bring the guards along so they are both created and dropped in the master thread.
- Take the bucketdb guard high up so it is evident where we are holding it.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp72
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp50
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h11
4 files changed, 82 insertions, 63 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index cef73d2975c..28f5e3c7111 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -107,7 +107,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &
_clusterStateChangedNotifier.addClusterStateChangedHandler(this);
_bucketStateChangedNotifier.addBucketStateChangedHandler(this);
_diskMemUsageNotifier.addDiskMemUsageListener(this);
- recompute();
+ recompute(_ready.meta_store()->getBucketDB().takeGuard());
}
BucketMoveJobV2::~BucketMoveJobV2()
@@ -205,9 +205,6 @@ BucketMoveJobV2::failOperation(BucketId bucketId) {
_master.execute(makeLambdaTask([this, bucketId]() {
if (_stopped.load(std::memory_order_relaxed)) return;
cancelBucket(bucketId);
- if (_bucketsInFlight.contains(bucketId)) {
- handleMoveResult(_bucketsInFlight[bucketId]);
- }
considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
}));
}
@@ -240,37 +237,42 @@ BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDe
}
void
-BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> ops, IDestructorCallbackSP onDone) {
- mover->moveDocuments(std::move(ops), std::move(onDone));
- handleMoveResult(std::move(mover));
+BucketMoveJobV2::completeMove(BucketMoverSP mover, GuardedMoveOps ops, IDestructorCallbackSP onDone) {
+ mover->moveDocuments(std::move(ops.success), std::move(onDone));
+ ops.failed.clear();
+ if (checkIfMoverComplete(*mover)) {
+ checkForReschedule(_ready.meta_store()->getBucketDB().takeGuard(), mover->getBucket());
+ }
}
-void
-BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) {
- bool bucketMoveComplete = mover->allScheduled() && mover->inSync();
- if (bucketMoveComplete || mover->needReschedule()) {
- BucketId bucket = mover->getBucket();
- assert(mover->needReschedule() || _bucketsInFlight.contains(bucket));
+bool
+BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) {
+ bool bucketMoveComplete = mover.allScheduled() && mover.inSync();
+ bool needReschedule = mover.needReschedule();
+ if (bucketMoveComplete || needReschedule) {
+ BucketId bucket = mover.getBucket();
+ assert(mover.needReschedule() || _bucketsInFlight.contains(bucket));
_bucketsInFlight.erase(bucket);
- if (mover->needReschedule()) {
- reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket);
- if (!_buckets2Move.contains(bucket)) {
- // It failed, but all was moved anyway
- _modifiedHandler.notifyBucketModified(bucket);
- }
+ if (needReschedule) {
+ _postponedUntilSafe.insert(bucket);
} else {
_modifiedHandler.notifyBucketModified(bucket);
}
- if (_postponedUntilSafe.contains(bucket)) {
- _postponedUntilSafe.erase(bucket);
- reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket);
- }
- if (_bucketsInFlight.empty() && _postponedUntilSafe.contains(RECOMPUTE_TOKEN)) {
- _postponedUntilSafe.erase(RECOMPUTE_TOKEN);
- recompute();
- }
- updatePending();
+ return true;
}
+ return _bucketsInFlight.empty();
+}
+void
+BucketMoveJobV2::checkForReschedule(const bucketdb::Guard & guard, BucketId bucket) {
+ if (_postponedUntilSafe.contains(bucket)) {
+ _postponedUntilSafe.erase(bucket);
+ reconsiderBucket(guard, bucket);
+ }
+ if (_bucketsInFlight.empty() && _postponedUntilSafe.contains(RECOMPUTE_TOKEN)) {
+ _postponedUntilSafe.erase(RECOMPUTE_TOKEN);
+ recompute(guard);
+ }
+ updatePending();
}
void
@@ -281,7 +283,7 @@ BucketMoveJobV2::cancelBucket(BucketId bucket) {
_movers.erase(std::remove_if(_movers.begin(), _movers.end(),
[bucket](const BucketMoverSP &mover) { return mover->getBucket() == bucket; }),
_movers.end());
- handleMoveResult(inFlight->second);
+ checkIfMoverComplete(*inFlight->second);
}
}
@@ -315,10 +317,10 @@ BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketI
}
BucketMoveJobV2::BucketMoveSet
-BucketMoveJobV2::computeBuckets2Move()
+BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard)
{
BucketMoveJobV2::BucketMoveSet toMove;
- for (ScanIterator itr(_ready.meta_store()->getBucketDB().takeGuard(), BucketId()); itr.valid(); ++itr) {
+ for (ScanIterator itr(guard, BucketId()); itr.valid(); ++itr) {
auto [mustMove, wantReady] = needMove(itr);
if (mustMove) {
toMove[itr.getBucket()] = wantReady;
@@ -398,7 +400,11 @@ BucketMoveJobV2::run()
void
BucketMoveJobV2::recompute() {
- _buckets2Move = computeBuckets2Move();
+ recompute(_ready.meta_store()->getBucketDB().takeGuard());
+}
+void
+BucketMoveJobV2::recompute(const bucketdb::Guard & guard) {
+ _buckets2Move = computeBuckets2Move(guard);
updatePending();
}
@@ -427,7 +433,7 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal
std::for_each(_bucketsInFlight.begin(), _bucketsInFlight.end(), [](auto & entry) { entry.second->cancel();});
std::erase_if(_bucketsInFlight, [](auto & entry) { return entry.second->inSync(); });
if (_bucketsInFlight.empty()) {
- recompute();
+ recompute(_ready.meta_store()->getBucketDB().takeGuard());
} else {
_postponedUntilSafe.insert(RECOMPUTE_TOKEN);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index 0f1c567b1b3..d7f44ee37b2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -55,7 +55,7 @@ private:
using Bucket2Mover = std::map<BucketId, BucketMoverSP>;
using Movers = std::vector<BucketMoverSP>;
using MoveKey = BucketMover::MoveKey;
- using GuardedMoveOp = BucketMover::GuardedMoveOp;
+ using GuardedMoveOps = BucketMover::GuardedMoveOps;
using BucketSet = vespalib::hash_set<BucketId, BucketId::hash>;
std::shared_ptr<IBucketStateCalculator> _calc;
IDocumentMoveHandler &_moveHandler;
@@ -83,19 +83,21 @@ private:
void startMove(BucketMoverSP mover, size_t maxDocsToMove);
void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context);
- void completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> keys, IDestructorCallbackSP context);
- void handleMoveResult(BucketMoverSP mover);
+ void completeMove(BucketMoverSP mover, GuardedMoveOps moveOps, IDestructorCallbackSP context);
+ bool checkIfMoverComplete(const BucketMover & mover);
+ void checkForReschedule(const bucketdb::Guard & guard, BucketId bucket);
void considerBucket(const bucketdb::Guard & guard, BucketId bucket);
void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket);
void updatePending();
void cancelBucket(BucketId bucket); // True if something to cancel
NeedResult needMove(const ScanIterator &itr) const;
- BucketMoveSet computeBuckets2Move();
+ BucketMoveSet computeBuckets2Move(const bucketdb::Guard & guard);
BucketMoverSP createMover(BucketId bucket, bool wantReady);
BucketMoverSP greedyCreateMover();
void backFillMovers();
void moveDocs(size_t maxDocsToMove);
void failOperation(BucketId bucket);
+ void recompute(const bucketdb::Guard & guard);
friend class StartMove;
public:
BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
@@ -117,7 +119,7 @@ public:
bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket);
bool done() const;
- void recompute();
+ void recompute(); // Only for testing
bool inSync() const;
bool run() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index e33513df3f1..f08cd4d7ab7 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -20,8 +20,8 @@ namespace proton::bucketdb {
typedef IDocumentMetaStore::Iterator Iterator;
-BucketMover::GuardedMoveOp
-BucketMover::createMoveOperation(MoveKey &key) {
+MoveOperation::UP
+BucketMover::createMoveOperation(const MoveKey &key) {
if (_source->lidNeedsCommit(key._lid)) return {};
const RawDocumentMetaData &metaNow = _source->meta_store()->getRawMetaData(key._lid);
@@ -29,13 +29,13 @@ BucketMover::createMoveOperation(MoveKey &key) {
if (metaNow.getTimestamp() != key._timestamp) return {};
Document::SP doc(_source->retriever()->getFullDocument(key._lid));
- if (!doc || doc->getId().getGlobalId() != key._gid)
- return {}; // Failed to retrieve document, removed or changed identity
+ if (!doc || doc->getId().getGlobalId() != key._gid) {
+ // Failed to retrieve document, removed or changed identity
+ return {};
+ }
BucketId bucketId = _bucket.stripUnused();
- return BucketMover::GuardedMoveOp(std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc),
- DbDocumentId(_source->sub_db_id(), key._lid),
- _targetSubDbId),
- std::move(key._guard));
+ return std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc),
+ DbDocumentId(_source->sub_db_id(), key._lid), _targetSubDbId);
}
void
@@ -89,19 +89,26 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) {
return result;
}
-std::vector<BucketMover::GuardedMoveOp>
+BucketMover::GuardedMoveOps
BucketMover::createMoveOperations(std::vector<MoveKey> toMove) {
- std::vector<GuardedMoveOp> successfulReads;
- successfulReads.reserve(toMove.size());
+ GuardedMoveOps moveOps;
+ moveOps.success.reserve(toMove.size());
for (MoveKey &key : toMove) {
- auto moveOp = createMoveOperation(key);
- if (!moveOp.first) {
- _needReschedule.store(true, std::memory_order_relaxed);
- break;
+ if (moveOps.failed.empty()) {
+ auto moveOp = createMoveOperation(key);
+ if (moveOp) {
+ moveOps.success.emplace_back(std::move(moveOp), std::move(key._guard));
+ } else {
+ moveOps.failed.push_back(std::move(key._guard));
+ }
+ } else {
+ moveOps.failed.push_back(std::move(key._guard));
}
- successfulReads.push_back(std::move(moveOp));
}
- return successfulReads;
+ if ( ! moveOps.failed.empty()) {
+ _needReschedule.store(true, std::memory_order_relaxed);
+ }
+ return moveOps;
}
void
@@ -149,17 +156,16 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter &
return true;
}
auto [keys, done] = _impl->getKeysToMove(maxDocsToMove);
- size_t numKeys = keys.size();
auto moveOps = _impl->createMoveOperations(std::move(keys));
- bool allOk = (numKeys == moveOps.size());
+ bool allOk = moveOps.failed.empty();
if (done && allOk) {
_impl->setAllScheduled();
}
- if (moveOps.empty()) return allOk;
+ if (moveOps.success.empty()) return allOk;
- _impl->updateLastValidGid(moveOps.back().first->getDocument()->getId().getGlobalId());
+ _impl->updateLastValidGid(moveOps.success.back().first->getDocument()->getId().getGlobalId());
- for (auto & moveOp : moveOps) {
+ for (auto & moveOp : moveOps.success) {
// We cache the bucket for the document we are going to move to avoid getting
// inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready
// sub dbs as the bucket info is not updated atomically in this case.
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
index 05ceb4c17e6..fc7760a4dc4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h
@@ -64,6 +64,12 @@ public:
MoveGuard _guard;
};
+ using GuardedMoveOp = std::pair<MoveOperationUP, MoveGuard>;
+ struct GuardedMoveOps {
+ std::vector<GuardedMoveOp> success;
+ std::vector<MoveGuard> failed;
+ };
+
BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source,
uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept;
BucketMover(BucketMover &&) noexcept = delete;
@@ -72,11 +78,10 @@ public:
BucketMover & operator=(const BucketMover &) = delete;
~BucketMover();
- using GuardedMoveOp = std::pair<MoveOperationUP, MoveGuard>;
/// Must be called in master thread
std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove);
/// Call from any thread
- std::vector<GuardedMoveOp> createMoveOperations(std::vector<MoveKey> toMove);
+ GuardedMoveOps createMoveOperations(std::vector<MoveKey> toMove);
/// Must be called in master thread
void moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone);
void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone);
@@ -108,7 +113,7 @@ private:
bool _allScheduled; // All moves started, or operation has been cancelled
bool _lastGidValid;
document::GlobalId _lastGid;
- GuardedMoveOp createMoveOperation(MoveKey & key);
+ MoveOperationUP createMoveOperation(const MoveKey & key);
size_t pending() const {
return _started.load(std::memory_order_relaxed) - _completed.load(std::memory_order_relaxed);
}