aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-03-08 00:19:02 +0100
committerGitHub <noreply@github.com>2021-03-08 00:19:02 +0100
commite6637fd3c050bce8fee3400e61eb49764f7db8e9 (patch)
tree15e34a58ca02e0cf346f69435ea5b0b8efcf90b6
parent8ac16cc9496e41c2f721c069ecf16a035eaa7161 (diff)
parent10b0564947190d606d708551194306f00e5a9348 (diff)
Merge pull request #16828 from vespa-engine/balder/verify-disconnected-movers
- considerBucket calls cancel already, no need for explicit cancel in…
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp39
1 files changed, 19 insertions, 20 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index 8b405a85bf8..184a8304cb4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -53,8 +53,6 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc)
return !(clusterUp && nodeUp && !nodeInitializing);
}
-constexpr BucketId RECOMPUTE_TOKEN;
-
}
BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
@@ -204,7 +202,6 @@ BucketMoveJobV2::failOperation(BucketId bucketId) {
if (_stopped.load(std::memory_order_relaxed)) return;
_master.execute(makeLambdaTask([this, bucketId]() {
if (_stopped.load(std::memory_order_relaxed)) return;
- cancelBucket(bucketId);
considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
}));
}
@@ -251,30 +248,35 @@ BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) {
bool needReschedule = mover.needReschedule();
if (bucketMoveComplete || needReschedule) {
BucketId bucket = mover.getBucket();
- assert(needReschedule || _bucketsInFlight.contains(bucket));
- _bucketsInFlight.erase(bucket);
+ auto found = _bucketsInFlight.find(bucket);
if (needReschedule) {
- _movers.erase(std::remove_if(_movers.begin(), _movers.end(),
- [bucket](const BucketMoverSP &cand) { return cand->getBucket() == bucket; }),
- _movers.end());
- _postponedUntilSafe.insert(bucket);
+ if ((found != _bucketsInFlight.end()) && (&mover == found->second.get())) {
+ //Prevent old disconnected mover from creating havoc.
+ _bucketsInFlight.erase(found);
+ _movers.erase(std::remove_if(_movers.begin(), _movers.end(),
+ [bucket](const BucketMoverSP &cand) {
+ return cand->getBucket() == bucket;
+ }),
+ _movers.end());
+ _postponedUntilSafe.insert(bucket);
+ return true;
+ }
} else {
+ assert(found != _bucketsInFlight.end());
+ _bucketsInFlight.erase(found);
_modifiedHandler.notifyBucketModified(bucket);
+ return true;
}
- return true;
}
return _bucketsInFlight.empty();
}
+
void
BucketMoveJobV2::checkForReschedule(const bucketdb::Guard & guard, BucketId bucket) {
if (_postponedUntilSafe.contains(bucket)) {
_postponedUntilSafe.erase(bucket);
reconsiderBucket(guard, bucket);
}
- if (_bucketsInFlight.empty() && _postponedUntilSafe.contains(RECOMPUTE_TOKEN)) {
- _postponedUntilSafe.erase(RECOMPUTE_TOKEN);
- recompute(guard);
- }
updatePending();
}
@@ -415,6 +417,7 @@ BucketMoveJobV2::backFillMovers() {
auto mover = greedyCreateMover();
_movers.push_back(mover);
auto bucketId = mover->getBucket();
+ assert( ! _bucketsInFlight.contains(bucketId));
_bucketsInFlight[bucketId] = std::move(mover);
}
updatePending();
@@ -431,12 +434,8 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal
unBlock(BlockedReason::CLUSTER_STATE);
_movers.clear();
std::for_each(_bucketsInFlight.begin(), _bucketsInFlight.end(), [](auto & entry) { entry.second->cancel();});
- std::erase_if(_bucketsInFlight, [](auto & entry) { return entry.second->inSync(); });
- if (_bucketsInFlight.empty()) {
- recompute(_ready.meta_store()->getBucketDB().takeGuard());
- } else {
- _postponedUntilSafe.insert(RECOMPUTE_TOKEN);
- }
+ _bucketsInFlight.clear();
+ recompute(_ready.meta_store()->getBucketDB().takeGuard());
}
}