summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp142
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h42
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonetask.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp50
11 files changed, 125 insertions, 149 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
index 609f2413bdc..64398503dfa 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp
@@ -43,7 +43,7 @@ struct ControllerFixtureBase : public ::testing::Test
DummyBucketExecutor _bucketExecutor;
MyMoveHandler _moveHandler;
DocumentDBTaggedMetrics _metrics;
- std::shared_ptr<BucketMoveJobV2> _bmj;
+ std::shared_ptr<BucketMoveJob> _bmj;
MyCountJobRunner _runner;
ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts);
~ControllerFixtureBase();
@@ -125,9 +125,9 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig
_bucketExecutor(4),
_moveHandler(*_bucketDB, storeMoveDoneContexts),
_metrics("test", 1),
- _bmj(BucketMoveJobV2::create(_calc, RetainGuard(_refCount), _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb,
- _notReady._subDb, _bucketCreateNotifier,_clusterStateHandler, _bucketHandler,
- _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())),
+ _bmj(BucketMoveJob::create(_calc, RetainGuard(_refCount), _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb,
+ _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler,
+ _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())),
_runner(*_bmj)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
index a43fd55ba02..6aa78276598 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp
@@ -53,27 +53,27 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc)
}
-BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
- RetainGuard dbRetainer,
- IDocumentMoveHandler &moveHandler,
- IBucketModifiedHandler &modifiedHandler,
- IThreadService & master,
- BucketExecutor & bucketExecutor,
- const MaintenanceDocumentSubDB &ready,
- const MaintenanceDocumentSubDB &notReady,
- bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
- IClusterStateChangedNotifier &clusterStateChangedNotifier,
- IBucketStateChangedNotifier &bucketStateChangedNotifier,
- IDiskMemUsageNotifier &diskMemUsageNotifier,
- const BlockableMaintenanceJobConfig &blockableConfig,
- const vespalib::string &docTypeName,
- document::BucketSpace bucketSpace)
+BucketMoveJob::BucketMoveJob(const std::shared_ptr<IBucketStateCalculator> &calc,
+ RetainGuard dbRetainer,
+ IDocumentMoveHandler &moveHandler,
+ IBucketModifiedHandler &modifiedHandler,
+ IThreadService & master,
+ BucketExecutor & bucketExecutor,
+ const MaintenanceDocumentSubDB &ready,
+ const MaintenanceDocumentSubDB &notReady,
+ bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ const vespalib::string &docTypeName,
+ document::BucketSpace bucketSpace)
: BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig),
IClusterStateChangedHandler(),
bucketdb::IBucketCreateListener(),
IBucketStateChangedHandler(),
IDiskMemUsageListener(),
- std::enable_shared_from_this<BucketMoveJobV2>(),
+ std::enable_shared_from_this<BucketMoveJob>(),
_calc(calc),
_dbRetainer(std::move(dbRetainer)),
_moveHandler(moveHandler),
@@ -105,7 +105,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &
recompute(_ready.meta_store()->getBucketDB().takeGuard());
}
-BucketMoveJobV2::~BucketMoveJobV2()
+BucketMoveJob::~BucketMoveJob()
{
_bucketCreateNotifier.removeListener(this);
_clusterStateChangedNotifier.removeClusterStateChangedHandler(this);
@@ -113,35 +113,35 @@ BucketMoveJobV2::~BucketMoveJobV2()
_diskMemUsageNotifier.removeDiskMemUsageListener(this);
}
-std::shared_ptr<BucketMoveJobV2>
-BucketMoveJobV2::create(const std::shared_ptr<IBucketStateCalculator> &calc,
- RetainGuard dbRetainer,
- IDocumentMoveHandler &moveHandler,
- IBucketModifiedHandler &modifiedHandler,
- IThreadService & master,
- BucketExecutor & bucketExecutor,
- const MaintenanceDocumentSubDB &ready,
- const MaintenanceDocumentSubDB &notReady,
- bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
- IClusterStateChangedNotifier &clusterStateChangedNotifier,
- IBucketStateChangedNotifier &bucketStateChangedNotifier,
- IDiskMemUsageNotifier &diskMemUsageNotifier,
- const BlockableMaintenanceJobConfig &blockableConfig,
- const vespalib::string &docTypeName,
- document::BucketSpace bucketSpace)
+std::shared_ptr<BucketMoveJob>
+BucketMoveJob::create(const std::shared_ptr<IBucketStateCalculator> &calc,
+ RetainGuard dbRetainer,
+ IDocumentMoveHandler &moveHandler,
+ IBucketModifiedHandler &modifiedHandler,
+ IThreadService & master,
+ BucketExecutor & bucketExecutor,
+ const MaintenanceDocumentSubDB &ready,
+ const MaintenanceDocumentSubDB &notReady,
+ bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ const vespalib::string &docTypeName,
+ document::BucketSpace bucketSpace)
{
- return std::shared_ptr<BucketMoveJobV2>(
- new BucketMoveJobV2(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady,
- bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier,
- diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace),
+ return std::shared_ptr<BucketMoveJob>(
+ new BucketMoveJob(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady,
+ bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier,
+ diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace),
[&master](auto job) {
auto failed = master.execute(makeLambdaTask([job]() { delete job; }));
assert(!failed);
});
}
-BucketMoveJobV2::NeedResult
-BucketMoveJobV2::needMove(const ScanIterator &itr) const {
+BucketMoveJob::NeedResult
+BucketMoveJob::needMove(const ScanIterator &itr) const {
NeedResult noMove(false, false);
const bool hasReadyDocs = itr.hasReadyBucketDocs();
const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs();
@@ -176,10 +176,10 @@ BucketMoveJobV2::needMove(const ScanIterator &itr) const {
return {true, wantReady};
}
-class BucketMoveJobV2::StartMove : public storage::spi::BucketTask {
+class BucketMoveJob::StartMove : public storage::spi::BucketTask {
public:
using IDestructorCallbackSP = std::shared_ptr<vespalib::IDestructorCallback>;
- StartMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker)
+ StartMove(std::shared_ptr<BucketMoveJob> job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker)
: _job(job),
_keys(std::move(keys)),
_opsTracker(std::move(opsTracker))
@@ -188,22 +188,22 @@ public:
void run(const Bucket &bucket, IDestructorCallbackSP onDone) override {
assert(_keys.mover().getBucket() == bucket.getBucketId());
using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>;
- BucketMoveJobV2::prepareMove(std::move(_job), std::move(_keys),
- std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
+ BucketMoveJob::prepareMove(std::move(_job), std::move(_keys),
+ std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
}
void fail(const Bucket &bucket) override {
- BucketMoveJobV2::failOperation(std::move(_job), bucket.getBucketId());
+ BucketMoveJob::failOperation(std::move(_job), bucket.getBucketId());
}
private:
- std::shared_ptr<BucketMoveJobV2> _job;
+ std::shared_ptr<BucketMoveJob> _job;
BucketMover::MoveKeys _keys;
IDestructorCallbackSP _opsTracker;
};
void
-BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucketId) {
+BucketMoveJob::failOperation(std::shared_ptr<BucketMoveJob> job, BucketId bucketId) {
auto & master = job->_master;
if (job->stopped()) return;
master.execute(makeLambdaTask([job=std::move(job), bucketId]() {
@@ -213,7 +213,7 @@ BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bu
}
void
-BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) {
+BucketMoveJob::startMove(BucketMover & mover, size_t maxDocsToMove) {
auto [keys, done] = mover.getKeysToMove(maxDocsToMove);
if (done) {
mover.setAllScheduled();
@@ -226,7 +226,7 @@ BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) {
}
void
-BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone)
+BucketMoveJob::prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone)
{
if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use.
auto moveOps = keys.createMoveOperations();
@@ -239,7 +239,7 @@ BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::
}
void
-BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) {
+BucketMoveJob::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) {
BucketMover & mover = ops.mover();
mover.moveDocuments(std::move(ops.success()), std::move(onDone));
ops.failed().clear();
@@ -249,7 +249,7 @@ BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone)
}
bool
-BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) {
+BucketMoveJob::checkIfMoverComplete(const BucketMover & mover) {
bool bucketMoveComplete = mover.allScheduled() && mover.inSync();
bool needReschedule = mover.needReschedule();
if (bucketMoveComplete || needReschedule) {
@@ -277,7 +277,7 @@ BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) {
}
void
-BucketMoveJobV2::cancelBucket(BucketId bucket) {
+BucketMoveJob::cancelBucket(BucketId bucket) {
auto inFlight = _bucketsInFlight.find(bucket);
if (inFlight != _bucketsInFlight.end()) {
inFlight->second->cancel();
@@ -286,14 +286,14 @@ BucketMoveJobV2::cancelBucket(BucketId bucket) {
}
void
-BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) {
+BucketMoveJob::considerBucket(const bucketdb::Guard & guard, BucketId bucket) {
cancelBucket(bucket);
assert( !_bucketsInFlight.contains(bucket));
reconsiderBucket(guard, bucket);
}
void
-BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) {
+BucketMoveJob::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) {
assert( ! _bucketsInFlight.contains(bucket));
ScanIterator itr(guard, bucket);
auto [mustMove, wantReady] = needMove(itr);
@@ -307,15 +307,15 @@ BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket
}
void
-BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket)
+BucketMoveJob::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket)
{
considerBucket(guard, bucket);
}
-BucketMoveJobV2::BucketMoveSet
-BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard)
+BucketMoveJob::BucketMoveSet
+BucketMoveJob::computeBuckets2Move(const bucketdb::Guard & guard)
{
- BucketMoveJobV2::BucketMoveSet toMove;
+ BucketMoveJob::BucketMoveSet toMove;
for (ScanIterator itr(guard, BucketId()); itr.valid(); ++itr) {
auto [mustMove, wantReady] = needMove(itr);
if (mustMove) {
@@ -326,7 +326,7 @@ BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard)
}
std::shared_ptr<BucketMover>
-BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) {
+BucketMoveJob::createMover(BucketId bucket, bool wantReady) {
const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready);
const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady);
LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)",
@@ -335,7 +335,7 @@ BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) {
}
std::shared_ptr<BucketMover>
-BucketMoveJobV2::greedyCreateMover() {
+BucketMoveJob::greedyCreateMover() {
if ( ! _buckets2Move.empty()) {
auto next = _buckets2Move.begin();
auto mover = createMover(next->first, next->second);
@@ -346,7 +346,7 @@ BucketMoveJobV2::greedyCreateMover() {
}
void
-BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
+BucketMoveJob::moveDocs(size_t maxDocsToMove) {
backFillMovers();
if (_movers.empty()) return;
@@ -364,7 +364,7 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) {
}
bool
-BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) {
+BucketMoveJob::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) {
for (size_t i(0); i < maxBuckets2Move; i++) {
moveDocs(maxDocsToMovePerBucket);
}
@@ -372,12 +372,12 @@ BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBuck
}
bool
-BucketMoveJobV2::done() const {
+BucketMoveJob::done() const {
return _buckets2Move.empty() && _movers.empty() && !isBlocked();
}
bool
-BucketMoveJobV2::run()
+BucketMoveJob::run()
{
if (isBlocked()) {
return true; // indicate work is done, since node state is bad
@@ -395,17 +395,17 @@ BucketMoveJobV2::run()
}
void
-BucketMoveJobV2::recompute() {
+BucketMoveJob::recompute() {
recompute(_ready.meta_store()->getBucketDB().takeGuard());
}
void
-BucketMoveJobV2::recompute(const bucketdb::Guard & guard) {
+BucketMoveJob::recompute(const bucketdb::Guard & guard) {
_buckets2Move = computeBuckets2Move(guard);
updatePending();
}
void
-BucketMoveJobV2::backFillMovers() {
+BucketMoveJob::backFillMovers() {
// Ensure we have enough movers.
while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) {
auto mover = greedyCreateMover();
@@ -418,7 +418,7 @@ BucketMoveJobV2::backFillMovers() {
}
void
-BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc)
+BucketMoveJob::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc)
{
// Called by master write thread
_calc = newCalc;
@@ -434,26 +434,26 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal
}
void
-BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState)
+BucketMoveJob::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState)
{
// Called by master write thread
considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
}
void
-BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state)
+BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state)
{
// Called by master write thread
internalNotifyDiskMemUsage(state);
}
void
-BucketMoveJobV2::updatePending() {
+BucketMoveJob::updatePending() {
_bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed);
}
void
-BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) const {
+BucketMoveJob::updateMetrics(DocumentDBTaggedMetrics & metrics) const {
// This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked.
metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) +
getLimiter().numPending());
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
index 7d5dafb33b3..573bdef13ef 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h
@@ -36,12 +36,12 @@ namespace bucketdb { class IBucketCreateNotifier; }
* 3 - Actual movement is then done in master thread while still holding bucket lock. Once bucket has fully moved
* bucket modified notification is sent.
*/
-class BucketMoveJobV2 : public BlockableMaintenanceJob,
+class BucketMoveJob : public BlockableMaintenanceJob,
public IClusterStateChangedHandler,
public bucketdb::IBucketCreateListener,
public IBucketStateChangedHandler,
public IDiskMemUsageListener,
- public std::enable_shared_from_this<BucketMoveJobV2>
+ public std::enable_shared_from_this<BucketMoveJob>
{
private:
using BucketExecutor = storage::spi::BucketExecutor;
@@ -79,24 +79,24 @@ private:
IBucketStateChangedNotifier &_bucketStateChangedNotifier;
IDiskMemUsageNotifier &_diskMemUsageNotifier;
- BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
- RetainGuard dbRetainer,
- IDocumentMoveHandler &moveHandler,
- IBucketModifiedHandler &modifiedHandler,
- IThreadService & master,
- BucketExecutor & bucketExecutor,
- const MaintenanceDocumentSubDB &ready,
- const MaintenanceDocumentSubDB &notReady,
- bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
- IClusterStateChangedNotifier &clusterStateChangedNotifier,
- IBucketStateChangedNotifier &bucketStateChangedNotifier,
- IDiskMemUsageNotifier &diskMemUsageNotifier,
- const BlockableMaintenanceJobConfig &blockableConfig,
- const vespalib::string &docTypeName,
- document::BucketSpace bucketSpace);
+ BucketMoveJob(const std::shared_ptr<IBucketStateCalculator> &calc,
+ RetainGuard dbRetainer,
+ IDocumentMoveHandler &moveHandler,
+ IBucketModifiedHandler &modifiedHandler,
+ IThreadService & master,
+ BucketExecutor & bucketExecutor,
+ const MaintenanceDocumentSubDB &ready,
+ const MaintenanceDocumentSubDB &notReady,
+ bucketdb::IBucketCreateNotifier &bucketCreateNotifier,
+ IClusterStateChangedNotifier &clusterStateChangedNotifier,
+ IBucketStateChangedNotifier &bucketStateChangedNotifier,
+ IDiskMemUsageNotifier &diskMemUsageNotifier,
+ const BlockableMaintenanceJobConfig &blockableConfig,
+ const vespalib::string &docTypeName,
+ document::BucketSpace bucketSpace);
void startMove(BucketMover & mover, size_t maxDocsToMove);
- static void prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP context);
+ static void prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::MoveKeys keys, IDestructorCallbackSP context);
void completeMove(GuardedMoveOps moveOps, IDestructorCallbackSP context);
bool checkIfMoverComplete(const BucketMover & mover);
void considerBucket(const bucketdb::Guard & guard, BucketId bucket);
@@ -109,11 +109,11 @@ private:
BucketMoverSP greedyCreateMover();
void backFillMovers();
void moveDocs(size_t maxDocsToMove);
- static void failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucket);
+ static void failOperation(std::shared_ptr<BucketMoveJob> job, BucketId bucket);
void recompute(const bucketdb::Guard & guard);
class StartMove;
public:
- static std::shared_ptr<BucketMoveJobV2>
+ static std::shared_ptr<BucketMoveJob>
create(const std::shared_ptr<IBucketStateCalculator> &calc,
RetainGuard dbRetainer,
IDocumentMoveHandler &moveHandler,
@@ -130,7 +130,7 @@ public:
const vespalib::string &docTypeName,
document::BucketSpace bucketSpace);
- ~BucketMoveJobV2() override;
+ ~BucketMoveJob() override;
bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket);
bool done() const;
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
index d0581a5b13f..a0ce668294f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp
@@ -134,7 +134,7 @@ BucketMover::createMoveOperations(MoveKeys toMove) {
void
BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone) {
for (auto & moveOp : moveOps) {
- moveDocument(std::move(moveOp.first), std::move(onDone));
+ moveDocument(std::move(moveOp.first), onDone);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index 215d4721b87..cfe8da18270 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -59,10 +59,10 @@ injectBucketMoveJob(MaintenanceController &controller,
DocumentDBJobTrackers &jobTrackers,
IDiskMemUsageNotifier &diskMemUsageNotifier)
{
- auto bmj = BucketMoveJobV2::create(calc, controller.retainDB(), moveHandler, bucketModifiedHandler, controller.masterThread(),
- bucketExecutor, controller.getReadySubDB(), controller.getNotReadySubDB(),
- bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier,
- diskMemUsageNotifier, config.getBlockableJobConfig(), docTypeName, bucketSpace);
+ auto bmj = BucketMoveJob::create(calc, controller.retainDB(), moveHandler, bucketModifiedHandler, controller.masterThread(),
+ bucketExecutor, controller.getReadySubDB(), controller.getNotReadySubDB(),
+ bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier,
+ diskMemUsageNotifier, config.getBlockableJobConfig(), docTypeName, bucketSpace);
controller.registerJobInMasterThread(trackJob(jobTrackers.getBucketMove(), std::move(bmj)));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
index e5666daffb6..16b6a320b66 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
@@ -17,10 +17,7 @@ namespace proton {
class OperationDoneContext : public vespalib::IDestructorCallback
{
FeedToken _token;
-protected:
void ack();
- FeedToken steal() { return std::move(_token); }
-
public:
OperationDoneContext(FeedToken token);
@@ -28,5 +25,4 @@ public:
bool hasToken() const { return static_cast<bool>(_token); }
};
-
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
index 859d8693f6d..b73b3c02120 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
@@ -22,7 +22,6 @@ RemoveDoneContext::RemoveDoneContext(FeedToken token, IPendingLidTracker::Token
RemoveDoneContext::~RemoveDoneContext()
{
- ack();
if (_task) {
vespalib::Executor::Task::UP res = _executor.execute(std::move(_task));
assert(!res);
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
index 485b82dd141..5eea6f7faaf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
@@ -14,7 +14,7 @@ struct IDocumentMetaStore;
/**
- * Context class for document removes that acks remove andschedules a
+ * Context class for document removes that acks remove and schedules a
* task when instance is destroyed. Typically a shared pointer to an
* instance is passed around to multiple worker threads that performs
* portions of a larger task before dropping the shared pointer,
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp
index c66a5d949e6..49a7490b263 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp
@@ -5,17 +5,14 @@
namespace proton {
-RemoveDoneTask::RemoveDoneTask(IDocumentMetaStore &documentMetaStore,
- uint32_t lid)
+RemoveDoneTask::RemoveDoneTask(IDocumentMetaStore &documentMetaStore, uint32_t lid)
: vespalib::Executor::Task(),
_documentMetaStore(documentMetaStore),
_lid(lid)
{
}
-RemoveDoneTask::~RemoveDoneTask()
-{
-}
+RemoveDoneTask::~RemoveDoneTask() = default;
void
RemoveDoneTask::run()
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonetask.h b/searchcore/src/vespa/searchcore/proton/server/removedonetask.h
index d9059e6ad6a..1aa59c6d37f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonetask.h
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonetask.h
@@ -29,12 +29,10 @@ class RemoveDoneTask : public vespalib::Executor::Task
public:
- RemoveDoneTask(IDocumentMetaStore &documentMetaStore,
- uint32_t lid);
+ RemoveDoneTask(IDocumentMetaStore &documentMetaStore, uint32_t lid);
+ ~RemoveDoneTask() override;
- virtual ~RemoveDoneTask();
-
- virtual void run() override;
+ void run() override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index 6066fef68d8..df9ff54e29e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -49,9 +49,7 @@ private:
public:
PutDoneContextForMove(FeedToken token, IPendingLidTracker::Token uncommitted,
- std::shared_ptr<const Document> doc,
- uint32_t lid,
- IDestructorCallback::SP moveDoneCtx)
+ std::shared_ptr<const Document> doc, uint32_t lid, IDestructorCallback::SP moveDoneCtx)
: PutDoneContext(std::move(token), std::move(uncommitted),std::move(doc), lid),
_moveDoneCtx(std::move(moveDoneCtx))
{}
@@ -60,25 +58,21 @@ public:
std::shared_ptr<PutDoneContext>
createPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted,
- std::shared_ptr<const Document> doc,
- uint32_t lid,
- IDestructorCallback::SP moveDoneCtx)
+ std::shared_ptr<const Document> doc, uint32_t lid, IDestructorCallback::SP moveDoneCtx)
{
std::shared_ptr<PutDoneContext> result;
if (moveDoneCtx) {
result = std::make_shared<PutDoneContextForMove>(std::move(token), std::move(uncommitted),
std::move(doc), lid, std::move(moveDoneCtx));
} else {
- result = std::make_shared<PutDoneContext>(std::move(token), std::move(uncommitted),
- std::move(doc), lid);
+ result = std::make_shared<PutDoneContext>(std::move(token), std::move(uncommitted), std::move(doc), lid);
}
return result;
}
std::shared_ptr<PutDoneContext>
createPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted,
- std::shared_ptr<const Document> doc,
- uint32_t lid)
+ std::shared_ptr<const Document> doc, uint32_t lid)
{
return createPutDoneContext(std::move(token), std::move(uncommitted), std::move(doc),
lid, IDestructorCallback::SP());
@@ -106,10 +100,8 @@ private:
public:
RemoveDoneContextForMove(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore,
- uint32_t lid, IDestructorCallback::SP moveDoneCtx)
- : RemoveDoneContext(std::move(token), std::move(uncommitted), executor,
- documentMetaStore, lid),
+ IDocumentMetaStore &documentMetaStore, uint32_t lid, IDestructorCallback::SP moveDoneCtx)
+ : RemoveDoneContext(std::move(token), std::move(uncommitted), executor, documentMetaStore, lid),
_moveDoneCtx(std::move(moveDoneCtx))
{}
~RemoveDoneContextForMove() override = default;
@@ -117,13 +109,11 @@ public:
std::shared_ptr<RemoveDoneContext>
createRemoveDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore,
- uint32_t lid, IDestructorCallback::SP moveDoneCtx)
+ IDocumentMetaStore &documentMetaStore, uint32_t lid, IDestructorCallback::SP moveDoneCtx)
{
if (moveDoneCtx) {
return std::make_shared<RemoveDoneContextForMove>
- (std::move(token), std::move(uncommitted), executor, documentMetaStore,
- lid, std::move(moveDoneCtx));
+ (std::move(token), std::move(uncommitted), executor, documentMetaStore, lid, std::move(moveDoneCtx));
} else {
return std::make_shared<RemoveDoneContext>
(std::move(token), std::move(uncommitted), executor, documentMetaStore, lid);
@@ -163,8 +153,8 @@ void putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_i
const DocumentOperation &op, bool is_removed_doc)
{
documentmetastore::IStore::Result putRes(
- meta_store.put(doc_id.getGlobalId(),
- op.getBucketId(), op.getTimestamp(), op.getSerializedDocSize(), op.getLid(), op.get_prepare_serial_num()));
+ meta_store.put(doc_id.getGlobalId(), op.getBucketId(), op.getTimestamp(),
+ op.getSerializedDocSize(), op.getLid(), op.get_prepare_serial_num()));
if (!putRes.ok()) {
throw IllegalStateException(
make_string("Could not put <lid, gid> pair for %sdocument with id '%s' and gid '%s'",
@@ -312,14 +302,13 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp)
if (putOp.changedDbdId() && useDocumentMetaStore(serialNum)) {
_gidToLidChangeHandler.notifyPut(token, docId.getGlobalId(), putOp.getLid(), serialNum);
}
- std::shared_ptr<PutDoneContext> onWriteDone =
- createPutDoneContext(std::move(token), std::move(uncommitted),
- doc, putOp.getLid());
+ auto onWriteDone = createPutDoneContext(std::move(token), std::move(uncommitted), doc, putOp.getLid());
putSummary(serialNum, putOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, putOp.getLid(), *doc, onWriteDone);
putIndexedFields(serialNum, putOp.getLid(), doc, onWriteDone);
}
if (docAlreadyExists && putOp.changedDbdId()) {
+ //TODO, better to have an else than an assert ?
assert(!putOp.getValidDbdId(_params._subDbId));
internalRemove(std::move(token), _pendingLidsForCommit->produce(putOp.getPrevLid()), serialNum,
putOp.getPrevLid(), IDestructorCallback::SP());
@@ -602,6 +591,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithDocI
}
if (rmOp.getValidPrevDbdId(_params._subDbId)) {
if (rmOp.changedDbdId()) {
+ //TODO Prefer else over assert ?
assert(!rmOp.getValidDbdId(_params._subDbId));
internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum,
rmOp.getPrevLid(), IDestructorCallback::SP());
@@ -630,14 +620,11 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithGid
void
StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token uncommitted, SerialNum serialNum,
- Lid lid,
- IDestructorCallback::SP moveDoneCtx)
+ Lid lid, IDestructorCallback::SP moveDoneCtx)
{
bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid);
- std::shared_ptr<RemoveDoneContext> onWriteDone;
- onWriteDone = createRemoveDoneContext(std::move(token), std::move(uncommitted),_writeService.master(), _metaStore,
- (explicitReuseLid ? lid : 0u),
- std::move(moveDoneCtx));
+ auto onWriteDone = createRemoveDoneContext(std::move(token), std::move(uncommitted), _writeService.master(), _metaStore,
+ (explicitReuseLid ? lid : 0u), std::move(moveDoneCtx));
removeSummary(serialNum, lid, onWriteDone);
removeAttributes(serialNum, lid, onWriteDone);
removeIndexedFields(serialNum, lid, onWriteDone);
@@ -786,9 +773,8 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
if (moveOp.changedDbdId() && useDocumentMetaStore(serialNum)) {
_gidToLidChangeHandler.notifyPut(FeedToken(), docId.getGlobalId(), moveOp.getLid(), serialNum);
}
- std::shared_ptr<PutDoneContext> onWriteDone =
- createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()),
- doc, moveOp.getLid(), doneCtx);
+ auto onWriteDone = createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()),
+ doc, moveOp.getLid(), doneCtx);
putSummary(serialNum, moveOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, moveOp.getLid(), *doc, onWriteDone);
putIndexedFields(serialNum, moveOp.getLid(), doc, onWriteDone);