summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-05-04 20:01:13 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-05-05 06:54:53 +0000
commite54133b4b3deda0afa56f0dead5f23c8f314f93c (patch)
tree1457d43c3805053dbd6881f94de05e8eed6962c0
parent1cf3fd15f0939eff93cd6494897f3a1ce9f6c2c0 (diff)
Make handle stop in a uniform way by putting the basics in the base class.
-rw-r--r--searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp18
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp19
-rw-r--r--searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h2
12 files changed, 38 insertions, 70 deletions
diff --git a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp
index 4287a6b262a..8862a6d3bb2 100644
--- a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp
@@ -34,13 +34,11 @@ struct MyMaintenanceJob : public IBlockableMaintenanceJob
GateVector _runGates;
size_t _runIdx;
bool _blocked;
- bool _stopped;
MyMaintenanceJob(size_t numRuns)
: IBlockableMaintenanceJob("myjob", 10s, 20s),
_runGates(getGateVector(numRuns)),
_runIdx(0),
- _blocked(false),
- _stopped(false)
+ _blocked(false)
{}
void block() { setBlocked(BlockedReason::RESOURCE_LIMITS); }
void unBlock() { unBlock(BlockedReason::RESOURCE_LIMITS); }
@@ -51,7 +49,7 @@ struct MyMaintenanceJob : public IBlockableMaintenanceJob
_runGates[_runIdx++]->await(5s);
return _runIdx == _runGates.size();
}
- void onStop() override { _stopped = true; }
+ void onStop() override { }
};
struct Fixture
@@ -65,10 +63,10 @@ struct Fixture
size_t _runIdx;
ThreadStackExecutor _exec;
Fixture(size_t numRuns = 1)
- : _tracker(new SimpleJobTracker(1)),
- _job(new MyMaintenanceJob(numRuns)),
+ : _tracker(std::make_shared<SimpleJobTracker>(1)),
+ _job(std::make_unique<MyMaintenanceJob>(numRuns)),
_myJob(static_cast<MyMaintenanceJob *>(_job.get())),
- _trackedJob(new JobTrackedMaintenanceJob(_tracker, std::move(_job))),
+ _trackedJob(std::make_unique<JobTrackedMaintenanceJob>(_tracker, std::move(_job))),
_runRetval(false),
_runGates(getGateVector(numRuns)),
_runIdx(0),
@@ -144,9 +142,9 @@ TEST_F("require that block calls are sent to underlying jobs", Fixture)
TEST_F("require that stop calls are sent to underlying jobs", Fixture)
{
- EXPECT_FALSE(f._myJob->_stopped);
- f._trackedJob->onStop();
- EXPECT_TRUE(f._myJob->_stopped);
+ EXPECT_FALSE(f._myJob->stopped());
+ f._trackedJob->stop();
+ EXPECT_TRUE(f._myJob->stopped());
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index 49236a829f5..e73fa739fbb 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -290,17 +290,14 @@ struct MySimpleJob : public BlockableMaintenanceJob
{
vespalib::CountDownLatch _latch;
size_t _runCnt;
- bool _stopped;
MySimpleJob(vespalib::duration delay,
vespalib::duration interval,
uint32_t finishCount)
: BlockableMaintenanceJob("my_job", delay, interval),
_latch(finishCount),
- _runCnt(0),
- _stopped(false)
- {
- }
+ _runCnt(0)
+ { }
void block() { setBlocked(BlockedReason::FROZEN_BUCKET); }
bool run() override {
LOG(info, "MySimpleJob::run()");
@@ -308,10 +305,6 @@ struct MySimpleJob : public BlockableMaintenanceJob
++_runCnt;
return true;
}
- void onStop() override {
- BlockableMaintenanceJob::onStop();
- _stopped = true;
- }
};
struct MySplitJob : public MySimpleJob
@@ -333,13 +326,11 @@ struct MySplitJob : public MySimpleJob
struct MyLongRunningJob : public BlockableMaintenanceJob
{
vespalib::Gate _firstRun;
- bool _stopped;
MyLongRunningJob(vespalib::duration delay,
vespalib::duration interval)
: BlockableMaintenanceJob("long_running_job", delay, interval),
- _firstRun(),
- _stopped(false)
+ _firstRun()
{
}
void block() { setBlocked(BlockedReason::FROZEN_BUCKET); }
@@ -348,10 +339,6 @@ struct MyLongRunningJob : public BlockableMaintenanceJob
usleep(10000);
return false;
}
- void onStop() override {
- BlockableMaintenanceJob::onStop();
- _stopped = true;
- }
};
using MyAttributeManager = test::MockAttributeManager;
diff --git a/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp b/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp
index 1f152f4a257..8466a016fc3 100644
--- a/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp
@@ -12,11 +12,9 @@ using namespace proton;
struct MyBlockableMaintenanceJob : public IBlockableMaintenanceJob {
bool blocked;
- bool stopped;
MyBlockableMaintenanceJob()
: IBlockableMaintenanceJob("my_job", 1s, 1s),
- blocked(false),
- stopped(false)
+ blocked(false)
{}
void setBlocked(BlockedReason reason) override {
ASSERT_TRUE(reason == BlockedReason::OUTSTANDING_OPS);
@@ -29,7 +27,7 @@ struct MyBlockableMaintenanceJob : public IBlockableMaintenanceJob {
blocked = false;
}
bool run() override { return true; }
- void onStop() override { stopped = true; }
+ void onStop() override { }
};
struct Fixture {
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index 9e34462ae21..f2c6dc3a8be 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -87,7 +87,6 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &
_movers(),
_bucketsInFlight(),
_buckets2Move(),
- _stopped(false),
_bucketsPending(0),
_bucketCreateNotifier(bucketCreateNotifier),
_clusterStateChangedNotifier(clusterStateChangedNotifier),
@@ -206,9 +205,9 @@ private:
void
BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucketId) {
auto & master = job->_master;
- if (job->_stopped) return;
+ if (job->stopped()) return;
master.execute(makeLambdaTask([job=std::move(job), bucketId]() {
- if (job->_stopped.load(std::memory_order_relaxed)) return;
+ if (job->stopped()) return;
job->considerBucket(job->_ready.meta_store()->getBucketDB().takeGuard(), bucketId);
}));
}
@@ -229,12 +228,12 @@ BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) {
void
BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone)
{
- if (job->_stopped) return; //TODO Remove once lidtracker is no longer in use.
+ if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use.
auto moveOps = keys.createMoveOperations();
auto & master = job->_master;
- if (job->_stopped) return;
+ if (job->stopped()) return;
master.execute(makeLambdaTask([job=std::move(job), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable {
- if (job->_stopped.load(std::memory_order_relaxed)) return;
+ if (job->stopped()) return;
job->completeMove(std::move(moveOps), std::move(onDone));
}));
}
@@ -449,13 +448,6 @@ BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state)
}
void
-BucketMoveJobV2::onStop() {
- // Called by master write thread
- BlockableMaintenanceJob::onStop();
- _stopped = true;
-}
-
-void
BucketMoveJobV2::updatePending() {
_bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index df75c8c9766..7d5dafb33b3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -72,7 +72,6 @@ private:
Bucket2Mover _bucketsInFlight;
BucketMoveSet _buckets2Move;
- std::atomic<bool> _stopped;
std::atomic<size_t> _bucketsPending;
bucketdb::IBucketCreateNotifier &_bucketCreateNotifier;
@@ -142,7 +141,6 @@ public:
void notifyBucketStateChanged(const BucketId &bucketId, ActiveState newState) override;
void notifyDiskMemUsage(DiskMemUsageState state) override;
void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override;
- void onStop() override;
void updateMetrics(DocumentDBTaggedMetrics & metrics) const override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
index 6bea9855c82..7148576b76f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h
@@ -4,6 +4,7 @@
#include <vespa/vespalib/stllike/string.h>
#include <vespa/vespalib/util/time.h>
#include <memory>
+#include <atomic>
namespace proton {
@@ -21,7 +22,9 @@ private:
const vespalib::string _name;
const vespalib::duration _delay;
const vespalib::duration _interval;
-
+ std::atomic<bool> _stopped;
+protected:
+ virtual void onStop() = 0;
public:
using UP = std::unique_ptr<IMaintenanceJob>;
using SP = std::shared_ptr<IMaintenanceJob>;
@@ -31,7 +34,8 @@ public:
vespalib::duration interval)
: _name(name),
_delay(delay),
- _interval(interval)
+ _interval(interval),
+ _stopped(false)
{}
virtual ~IMaintenanceJob() = default;
@@ -41,9 +45,12 @@ public:
virtual vespalib::duration getInterval() const { return _interval; }
virtual bool isBlocked() const { return false; }
virtual IBlockableMaintenanceJob *asBlockable() { return nullptr; }
- virtual void onStop() = 0;
virtual void updateMetrics(DocumentDBTaggedMetrics &) const {}
-
+ void stop() {
+ _stopped = true;
+ onStop();
+ }
+ bool stopped() const { return _stopped.load(std::memory_order_relaxed); }
/**
* Register maintenance job runner, in case event passed to the
* job causes it to want to be run again.
diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
index 0e1b2b00ce5..ecc592a00a4 100644
--- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
+++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h
@@ -9,7 +9,7 @@ namespace proton {
/**
* Class for tracking the start and end of a maintenance job.
*/
-class JobTrackedMaintenanceJob : public IMaintenanceJob
+class JobTrackedMaintenanceJob final : public IMaintenanceJob
{
private:
IJobTracker::SP _tracker;
@@ -26,7 +26,7 @@ public:
_job->registerRunner(runner);
}
bool run() override;
- void onStop() override { _job->onStop(); }
+ void onStop() override { _job->stop(); }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
index f2333f2f2b7..89a2e226046 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
@@ -59,7 +59,7 @@ public:
void fail(const Bucket & bucket) override {
assert(bucket.getBucketId() == _meta.bucketId);
auto & master = _job->_master;
- if (_job->_stopped) return;
+ if (_job->stopped()) return;
master.execute(makeLambdaTask([job=std::move(_job)] { job->_scanItr.reset(); }));
}
private:
@@ -88,7 +88,7 @@ void
CompactionJob::moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen,
std::shared_ptr<IDestructorCallback> context)
{
- if (job->_stopped) return; //TODO Remove once lidtracker is no longer in use.
+ if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use.
// The real lid must be sampled in the master thread.
//TODO remove target lid from createMoveOperation interface
auto op = job->_handler->createMoveOperation(metaThen, 0);
@@ -97,9 +97,9 @@ CompactionJob::moveDocument(std::shared_ptr<CompactionJob> job, const search::Do
if (metaThen.gid != op->getDocument()->getId().getGlobalId()) return;
auto & master = job->_master;
- if (job->_stopped) return;
+ if (job->stopped()) return;
master.execute(makeLambdaTask([self=std::move(job), meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable {
- if (self->_stopped.load(std::memory_order_relaxed)) return;
+ if (self->stopped()) return;
self->completeMove(meta, std::move(moveOp), std::move(onDone));
}));
}
@@ -151,8 +151,7 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
_master(master),
_bucketExecutor(bucketExecutor),
_dbRetainer(std::move(dbRetainer)),
- _bucketSpace(bucketSpace),
- _stopped(false)
+ _bucketSpace(bucketSpace)
{
_diskMemUsageNotifier.addDiskMemUsageListener(this);
_clusterStateChangedNotifier.addClusterStateChangedHandler(this);
@@ -189,12 +188,6 @@ CompactionJob::create(const DocumentDBLidSpaceCompactionConfig &config,
});
}
-void
-CompactionJob::onStop() {
- BlockableMaintenanceJob::onStop();
- _stopped = true;
-}
-
DocumentMetaData
CompactionJob::getNextDocument(const LidUsageStats &stats, bool retryLastDocument)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
index 141f29f4510..725c7387bdc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
@@ -52,7 +52,6 @@ private:
BucketExecutor &_bucketExecutor;
RetainGuard _dbRetainer;
document::BucketSpace _bucketSpace;
- std::atomic<bool> _stopped;
bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const;
bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const;
@@ -66,7 +65,6 @@ private:
std::shared_ptr<IDestructorCallback> onDone);
void completeMove(const search::DocumentMetaData & metaThen, std::unique_ptr<MoveOperation> moveOp,
std::shared_ptr<IDestructorCallback> onDone);
- void onStop() override;
class MoveTask;
CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp
index 658fa9f7482..d508c3193fc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp
@@ -25,7 +25,7 @@ MaintenanceJobRunner::stop() {
Guard guard(_lock);
_stopped = true;
}
- _job->onStop();
+ _job->stop();
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp
index f2cb6f3c270..0c5b165611e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp
@@ -36,7 +36,6 @@ PruneRemovedDocumentsJobV2(const DocumentDBPruneConfig &config, RetainGuard dbRe
_cfgAgeLimit(config.getAge()),
_subDbId(subDbId),
_bucketSpace(bucketSpace),
- _stopped(false),
_nextLid(1u)
{
}
@@ -75,7 +74,7 @@ PruneRemovedDocumentsJobV2::PruneTask::run(const Bucket & bucket, IDestructorCal
void
PruneRemovedDocumentsJobV2::remove(uint32_t lid, const RawDocumentMetaData & oldMeta) {
- if (_stopped.load(std::memory_order_relaxed)) return;
+ if (stopped()) return;
if ( ! _metaStore.validLid(lid)) return;
const RawDocumentMetaData &meta = _metaStore.getRawMetaData(lid);
if (meta.getBucketId() != oldMeta.getBucketId()) return;
diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h
index 98330c75d5c..b0eefedf8e2 100644
--- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h
@@ -40,7 +40,6 @@ private:
const vespalib::duration _cfgAgeLimit;
const uint32_t _subDbId;
const document::BucketSpace _bucketSpace;
- std::atomic<bool> _stopped;
DocId _nextLid;
@@ -51,7 +50,6 @@ private:
IPruneRemovedDocumentsHandler &handler, IThreadService & master,
BucketExecutor & bucketExecutor);
bool run() override;
- void onStop() override { _stopped = true; }
public:
static std::shared_ptr<PruneRemovedDocumentsJobV2>
create(const Config &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId,