summaryrefslogtreecommitdiffstats
path: root/searchcore/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-03-01 16:54:16 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-03-01 20:23:01 +0000
commit61cbfbe0dd4b35461c2662812588573e40e7a3cd (patch)
tree478f7d90e49f084649687cab6891d949ff0e73eb /searchcore/src
parent2e743e722ba395dc22755ab22bf2ae2bac2d9522 (diff)
Postpone buckets for reconsideration until it is safe.
Diffstat (limited to 'searchcore/src')
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp98
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h19
2 files changed, 84 insertions, 33 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index 0128db681d5..9d036519a6f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -53,6 +53,8 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc)
return !(clusterUp && nodeUp && !nodeInitializing);
}
+constexpr BucketId RECOMPUTE_BUCKETID;
+
}
BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
@@ -84,7 +86,9 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &
_bucketSpace(bucketSpace),
_iterateCount(0),
_movers(),
+ _bucketsInFlight(),
_buckets2Move(),
+ _postponedUntilSafe(),
_stopped(false),
_startedCount(0),
_executedCount(0),
@@ -200,6 +204,10 @@ BucketMoveJobV2::failOperation(BucketId bucketId) {
if (_stopped.load(std::memory_order_relaxed)) return;
_master.execute(makeLambdaTask([this, bucketId]() {
if (_stopped.load(std::memory_order_relaxed)) return;
+ cancelBucket(bucketId);
+ if (_bucketsInFlight.contains(bucketId)) {
+ handleMoveResult(_bucketsInFlight[bucketId]);
+ }
considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
}));
}
@@ -234,34 +242,58 @@ BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDe
void
BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> ops, IDestructorCallbackSP onDone) {
mover->moveDocuments(std::move(ops), std::move(onDone));
+ handleMoveResult(std::move(mover));
+}
+
+void
+BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) {
if (mover->bucketDone() && mover->inSync()) {
- _modifiedHandler.notifyBucketModified(mover->getBucket());
+ BucketId bucket = mover->getBucket();
+ assert(_bucketsInFlight.contains(bucket));
+ _modifiedHandler.notifyBucketModified(bucket);
+ _bucketsInFlight.erase(bucket);
+ updatePending();
+ if (_postponedUntilSafe.contains(bucket)) {
+ reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket);
+ }
+ if (_bucketsInFlight.empty() && _postponedUntilSafe.contains(RECOMPUTE_BUCKETID)) {
+ recompute();
+ }
}
}
void
-BucketMoveJobV2::cancelMovesForBucket(BucketId bucket) {
- for (auto itr = _movers.begin(); itr != _movers.end(); itr++) {
- if (bucket == (*itr)->getBucket()) {
- _movers.erase(itr);
- backFillMovers();
- return;
- }
+BucketMoveJobV2::cancelBucket(BucketId bucket) {
+ auto inFlight = _bucketsInFlight.find(bucket);
+ if (inFlight != _bucketsInFlight.end()) {
+ inFlight->second->cancel();
+ _movers.erase(std::remove_if(_movers.begin(), _movers.end(),
+ [bucket](const BucketMoverSP &mover) { return mover->getBucket() == bucket; }),
+ _movers.end());
+ handleMoveResult(std::move(inFlight->second));
}
}
void
-BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket)
-{
+BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) {
+ cancelBucket(bucket);
+ if (_bucketsInFlight.contains(bucket)) {
+ _postponedUntilSafe.insert(bucket);
+ } else {
+ reconsiderBucket(guard, bucket);
+ }
+}
+
+void
+BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) {
+ assert( ! _bucketsInFlight.contains(bucket));
ScanIterator itr(guard, bucket);
auto [mustMove, wantReady] = needMove(itr);
if (mustMove) {
_buckets2Move[bucket] = wantReady;
} else {
_buckets2Move.erase(bucket);
- cancelMovesForBucket(bucket);
}
- backFillMovers();
considerRun();
}
@@ -271,10 +303,10 @@ BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketI
considerBucket(guard, bucket);
}
-BucketMoveJobV2::BucketSet
+BucketMoveJobV2::BucketMoveSet
BucketMoveJobV2::computeBuckets2Move()
{
- BucketMoveJobV2::BucketSet toMove;
+ BucketMoveJobV2::BucketMoveSet toMove;
for (ScanIterator itr(_ready.meta_store()->getBucketDB().takeGuard(), BucketId()); itr.valid(); ++itr) {
auto [mustMove, wantReady] = needMove(itr);
if (mustMove) {
@@ -306,7 +338,8 @@ BucketMoveJobV2::greedyCreateMover() {
void
BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
- if (done()) return;
+ backFillMovers();
+ if (_movers.empty()) return;
// Select mover
size_t index = _iterateCount++ % _movers.size();
@@ -316,13 +349,7 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
if (!mover->bucketDone()) {
startMove(mover, maxDocsToMove);
if (mover->bucketDone()) {
- auto next = greedyCreateMover();
- if (next) {
- _movers[index] = next;
- } else {
- _movers.erase(_movers.begin() + index);
- }
- _bucketsPending.store(_movers.size() + _buckets2Move.size(), std::memory_order_relaxed);
+ _movers.erase(_movers.begin() + index);
}
}
}
@@ -337,7 +364,7 @@ BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBuck
bool
BucketMoveJobV2::done() const {
- return _buckets2Move.empty() && _movers.empty() && !isBlocked();
+ return _buckets2Move.empty() && _movers.empty() && _postponedUntilSafe.empty() && !isBlocked();
}
bool
@@ -360,19 +387,22 @@ BucketMoveJobV2::run()
void
BucketMoveJobV2::recompute() {
- _movers.clear();
_buckets2Move = computeBuckets2Move();
- backFillMovers();
+ updatePending();
}
void
BucketMoveJobV2::backFillMovers() {
// Ensure we have enough movers.
while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) {
- _movers.push_back(greedyCreateMover());
+ auto mover = greedyCreateMover();
+ _movers.push_back(mover);
+ auto bucketId = mover->getBucket();
+ _bucketsInFlight[bucketId] = std::move(mover);
}
- _bucketsPending.store(_movers.size() + _buckets2Move.size(), std::memory_order_relaxed);
+ updatePending();
}
+
void
BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc)
{
@@ -382,7 +412,14 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal
setBlocked(BlockedReason::CLUSTER_STATE);
} else {
unBlock(BlockedReason::CLUSTER_STATE);
- recompute();
+ _movers.clear();
+ std::for_each(_bucketsInFlight.begin(), _bucketsInFlight.end(), [](auto & entry) { entry.second->cancel();});
+ std::erase_if(_bucketsInFlight, [](auto & entry) { return entry.second->inSync(); });
+ if (_bucketsInFlight.empty()) {
+ recompute();
+ } else {
+ _postponedUntilSafe.insert(RECOMPUTE_BUCKETID);
+ }
}
}
@@ -415,6 +452,11 @@ BucketMoveJobV2::onStop() {
}
void
+BucketMoveJobV2::updatePending() {
+ _bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed);
+}
+
+void
BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) {
// This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked.
metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) +
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index a0bb8fa4d06..0f1c567b1b3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -9,6 +9,7 @@
#include "iclusterstatechangedhandler.h"
#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h>
#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h>
+#include <vespa/vespalib/stllike/hash_set.h>
namespace storage::spi { struct BucketExecutor; }
namespace searchcorespi::index { struct IThreadService; }
@@ -46,14 +47,16 @@ private:
using IThreadService = searchcorespi::index::IThreadService;
using BucketId = document::BucketId;
using ScanIterator = bucketdb::ScanIterator;
- using BucketSet = std::map<BucketId, bool>;
+ using BucketMoveSet = std::map<BucketId, bool>;
using NeedResult = std::pair<bool, bool>;
using ActiveState = storage::spi::BucketInfo::ActiveState;
using BucketMover = bucketdb::BucketMover;
using BucketMoverSP = std::shared_ptr<BucketMover>;
- using Movers = std::vector<std::shared_ptr<BucketMover>>;
+ using Bucket2Mover = std::map<BucketId, BucketMoverSP>;
+ using Movers = std::vector<BucketMoverSP>;
using MoveKey = BucketMover::MoveKey;
using GuardedMoveOp = BucketMover::GuardedMoveOp;
+ using BucketSet = vespalib::hash_set<BucketId, BucketId::hash>;
std::shared_ptr<IBucketStateCalculator> _calc;
IDocumentMoveHandler &_moveHandler;
IBucketModifiedHandler &_modifiedHandler;
@@ -64,7 +67,10 @@ private:
const document::BucketSpace _bucketSpace;
size_t _iterateCount;
Movers _movers;
- BucketSet _buckets2Move;
+ Bucket2Mover _bucketsInFlight;
+ BucketMoveSet _buckets2Move;
+ BucketSet _postponedUntilSafe;
+
std::atomic<bool> _stopped;
std::atomic<size_t> _startedCount;
std::atomic<size_t> _executedCount;
@@ -78,13 +84,16 @@ private:
void startMove(BucketMoverSP mover, size_t maxDocsToMove);
void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context);
void completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> keys, IDestructorCallbackSP context);
+ void handleMoveResult(BucketMoverSP mover);
void considerBucket(const bucketdb::Guard & guard, BucketId bucket);
+ void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket);
+ void updatePending();
+ void cancelBucket(BucketId bucket); // True if something to cancel
NeedResult needMove(const ScanIterator &itr) const;
- BucketSet computeBuckets2Move();
+ BucketMoveSet computeBuckets2Move();
BucketMoverSP createMover(BucketId bucket, bool wantReady);
BucketMoverSP greedyCreateMover();
void backFillMovers();
- void cancelMovesForBucket(BucketId bucket);
void moveDocs(size_t maxDocsToMove);
void failOperation(BucketId bucket);
friend class StartMove;