diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-03-08 00:19:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-08 00:19:02 +0100 |
commit | e6637fd3c050bce8fee3400e61eb49764f7db8e9 (patch) | |
tree | 15e34a58ca02e0cf346f69435ea5b0b8efcf90b6 | |
parent | 8ac16cc9496e41c2f721c069ecf16a035eaa7161 (diff) | |
parent | 10b0564947190d606d708551194306f00e5a9348 (diff) |
Merge pull request #16828 from vespa-engine/balder/verify-disconnected-movers
- considerBucket calls cancel already, no need for explicit cancel in…
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp | 39 |
1 files changed, 19 insertions, 20 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index 8b405a85bf8..184a8304cb4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -53,8 +53,6 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc) return !(clusterUp && nodeUp && !nodeInitializing); } -constexpr BucketId RECOMPUTE_TOKEN; - } BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, @@ -204,7 +202,6 @@ BucketMoveJobV2::failOperation(BucketId bucketId) { if (_stopped.load(std::memory_order_relaxed)) return; _master.execute(makeLambdaTask([this, bucketId]() { if (_stopped.load(std::memory_order_relaxed)) return; - cancelBucket(bucketId); considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); })); } @@ -251,30 +248,35 @@ BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) { bool needReschedule = mover.needReschedule(); if (bucketMoveComplete || needReschedule) { BucketId bucket = mover.getBucket(); - assert(needReschedule || _bucketsInFlight.contains(bucket)); - _bucketsInFlight.erase(bucket); + auto found = _bucketsInFlight.find(bucket); if (needReschedule) { - _movers.erase(std::remove_if(_movers.begin(), _movers.end(), - [bucket](const BucketMoverSP &cand) { return cand->getBucket() == bucket; }), - _movers.end()); - _postponedUntilSafe.insert(bucket); + if ((found != _bucketsInFlight.end()) && (&mover == found->second.get())) { + //Prevent old disconnected mover from creating havoc. + _bucketsInFlight.erase(found); + _movers.erase(std::remove_if(_movers.begin(), _movers.end(), + [bucket](const BucketMoverSP &cand) { + return cand->getBucket() == bucket; + }), + _movers.end()); + _postponedUntilSafe.insert(bucket); + return true; + } } else { + assert(found != _bucketsInFlight.end()); + _bucketsInFlight.erase(found); _modifiedHandler.notifyBucketModified(bucket); + return true; } - return true; } return _bucketsInFlight.empty(); } + void BucketMoveJobV2::checkForReschedule(const bucketdb::Guard & guard, BucketId bucket) { if (_postponedUntilSafe.contains(bucket)) { _postponedUntilSafe.erase(bucket); reconsiderBucket(guard, bucket); } - if (_bucketsInFlight.empty() && _postponedUntilSafe.contains(RECOMPUTE_TOKEN)) { - _postponedUntilSafe.erase(RECOMPUTE_TOKEN); - recompute(guard); - } updatePending(); } @@ -415,6 +417,7 @@ BucketMoveJobV2::backFillMovers() { auto mover = greedyCreateMover(); _movers.push_back(mover); auto bucketId = mover->getBucket(); + assert( ! _bucketsInFlight.contains(bucketId)); _bucketsInFlight[bucketId] = std::move(mover); } updatePending(); @@ -431,12 +434,8 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal unBlock(BlockedReason::CLUSTER_STATE); _movers.clear(); std::for_each(_bucketsInFlight.begin(), _bucketsInFlight.end(), [](auto & entry) { entry.second->cancel();}); - std::erase_if(_bucketsInFlight, [](auto & entry) { return entry.second->inSync(); }); - if (_bucketsInFlight.empty()) { - recompute(_ready.meta_store()->getBucketDB().takeGuard()); - } else { - _postponedUntilSafe.insert(RECOMPUTE_TOKEN); - } + _bucketsInFlight.clear(); + recompute(_ready.meta_store()->getBucketDB().takeGuard()); } } |