aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-03-08 17:56:37 +0100
committerGitHub <noreply@github.com>2021-03-08 17:56:37 +0100
commit6caf3fd12fc5fdb4dde20756e3f2c9d8c55e57b5 (patch)
tree76209e45de540c29c63e6f9a0c890701c15d5274
parent763972f491565875251e64691eb902dd7035d5e0 (diff)
parent1686c580d09847adfd84232bc74de0e88892f6f2 (diff)
Merge pull request #16829 from vespa-engine/balder/no-need-to-postpone-changes
No need to postpone cleanup as we fail fast with the atomic needResch…
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h3
2 files changed, 6 insertions, 25 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index 184a8304cb4..1c1a77475fc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -86,7 +86,6 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &
_movers(),
_bucketsInFlight(),
_buckets2Move(),
- _postponedUntilSafe(),
_stopped(false),
_startedCount(0),
_executedCount(0),
@@ -199,7 +198,6 @@ private:
void
BucketMoveJobV2::failOperation(BucketId bucketId) {
IncOnDestruct countGuard(_executedCount);
- if (_stopped.load(std::memory_order_relaxed)) return;
_master.execute(makeLambdaTask([this, bucketId]() {
if (_stopped.load(std::memory_order_relaxed)) return;
considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
@@ -213,7 +211,6 @@ BucketMoveJobV2::startMove(BucketMoverSP mover, size_t maxDocsToMove) {
mover->setAllScheduled();
}
if (keys.empty()) return;
- if (_stopped.load(std::memory_order_relaxed)) return;
mover->updateLastValidGid(keys.back()._gid);
Bucket spiBucket(document::Bucket(_bucketSpace, mover->getBucket()));
auto bucketTask = std::make_unique<StartMove>(*this, std::move(mover), std::move(keys), getLimiter().beginOperation());
@@ -225,7 +222,6 @@ void
BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDestructorCallbackSP onDone)
{
IncOnDestruct countGuard(_executedCount);
- if (_stopped.load(std::memory_order_relaxed)) return;
auto moveOps = mover->createMoveOperations(std::move(keys));
_master.execute(makeLambdaTask([this, mover=std::move(mover), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable {
if (_stopped.load(std::memory_order_relaxed)) return;
@@ -238,7 +234,7 @@ BucketMoveJobV2::completeMove(BucketMoverSP mover, GuardedMoveOps ops, IDestruct
mover->moveDocuments(std::move(ops.success), std::move(onDone));
ops.failed.clear();
if (checkIfMoverComplete(*mover)) {
- checkForReschedule(_ready.meta_store()->getBucketDB().takeGuard(), mover->getBucket());
+ reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), mover->getBucket());
}
}
@@ -258,26 +254,16 @@ BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) {
return cand->getBucket() == bucket;
}),
_movers.end());
- _postponedUntilSafe.insert(bucket);
return true;
}
} else {
assert(found != _bucketsInFlight.end());
_bucketsInFlight.erase(found);
_modifiedHandler.notifyBucketModified(bucket);
- return true;
}
}
- return _bucketsInFlight.empty();
-}
-
-void
-BucketMoveJobV2::checkForReschedule(const bucketdb::Guard & guard, BucketId bucket) {
- if (_postponedUntilSafe.contains(bucket)) {
- _postponedUntilSafe.erase(bucket);
- reconsiderBucket(guard, bucket);
- }
updatePending();
+ return false;
}
void
@@ -292,11 +278,8 @@ BucketMoveJobV2::cancelBucket(BucketId bucket) {
void
BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) {
cancelBucket(bucket);
- if (_bucketsInFlight.contains(bucket)) {
- _postponedUntilSafe.insert(bucket);
- } else {
- reconsiderBucket(guard, bucket);
- }
+ assert( !_bucketsInFlight.contains(bucket));
+ reconsiderBucket(guard, bucket);
}
void
@@ -309,6 +292,7 @@ BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket
} else {
_buckets2Move.erase(bucket);
}
+ updatePending();
considerRun();
}
@@ -379,7 +363,7 @@ BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBuck
bool
BucketMoveJobV2::done() const {
- return _buckets2Move.empty() && _movers.empty() && _postponedUntilSafe.empty() && !isBlocked();
+ return _buckets2Move.empty() && _movers.empty() && !isBlocked();
}
bool
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index d7f44ee37b2..620b76ac81c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -9,7 +9,6 @@
#include "iclusterstatechangedhandler.h"
#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h>
#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h>
-#include <vespa/vespalib/stllike/hash_set.h>
namespace storage::spi { struct BucketExecutor; }
namespace searchcorespi::index { struct IThreadService; }
@@ -56,7 +55,6 @@ private:
using Movers = std::vector<BucketMoverSP>;
using MoveKey = BucketMover::MoveKey;
using GuardedMoveOps = BucketMover::GuardedMoveOps;
- using BucketSet = vespalib::hash_set<BucketId, BucketId::hash>;
std::shared_ptr<IBucketStateCalculator> _calc;
IDocumentMoveHandler &_moveHandler;
IBucketModifiedHandler &_modifiedHandler;
@@ -69,7 +67,6 @@ private:
Movers _movers;
Bucket2Mover _bucketsInFlight;
BucketMoveSet _buckets2Move;
- BucketSet _postponedUntilSafe;
std::atomic<bool> _stopped;
std::atomic<size_t> _startedCount;