summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-04-09 21:57:23 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-04-09 22:26:51 +0000
commit1d7dd8e92608580a10ac1056e6590408db4602e5 (patch)
tree86bea912120441acb5ffbd602338e7b469740ac1 /searchcore
parent4bacf991453a3b88f99aad3c1c1caded5af29d83 (diff)
Use shared_ptr to lid space compaction job in order to ensure correct lifetime, and to avoid having to wait for pending operations
when reconfiguring or other events requiring move jobs to be stopped.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp4
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp81
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h8
6 files changed, 42 insertions, 57 deletions
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
index a82437012c2..683709317c2 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
@@ -56,11 +56,11 @@ JobTestBase::init(uint32_t allowedLidBloat,
_singleExecutor = std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
_master = std::make_unique<proton::ExecutorThreadService> (*_singleExecutor);
_bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4);
- _job = std::make_unique<lidspace::CompactionJob>(compactCfg, _handler, _storer, *_master, *_bucketExecutor,
+ _job = std::make_shared<lidspace::CompactionJob>(compactCfg, _handler, _storer, *_master, *_bucketExecutor,
_diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired,
document::BucketSpace::placeHolder());
} else {
- _job = std::make_unique<LidSpaceCompactionJob>(compactCfg, _handler, _storer, _frozenHandler, _diskMemUsageNotifier,
+ _job = std::make_shared<LidSpaceCompactionJob>(compactCfg, _handler, _storer, _frozenHandler, _diskMemUsageNotifier,
blockableCfg, _clusterStateHandler, nodeRetired);
}
}
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
index a78ad66592f..0a4e5c56acb 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
@@ -16,7 +16,7 @@ struct JobTestBase : public ::testing::TestWithParam<bool> {
MyFrozenBucketHandler _frozenHandler;
test::DiskMemUsageNotifier _diskMemUsageNotifier;
test::ClusterStateHandler _clusterStateHandler;
- std::unique_ptr<BlockableMaintenanceJob> _job;
+ std::shared_ptr<BlockableMaintenanceJob> _job;
JobTestBase();
~JobTestBase() override;
void init(uint32_t allowedLidBloat,
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
index 99efc3d4e41..225d128f9bf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp
@@ -130,9 +130,6 @@ LidSpaceCompactionJobBase::run()
}
if (_scanItr && !_scanItr->valid()) {
- if (!inSync()) {
- return false;
- }
if (shouldRestartScanDocuments(_handler->getLidStatus())) {
_scanItr = _handler->getIterator();
} else {
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
index 774d82a5d1d..d127025c496 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h
@@ -50,7 +50,6 @@ private:
void compactLidSpace(const search::LidUsageStats &stats);
bool remove_batch_is_ongoing() const;
bool remove_is_ongoing() const;
- virtual bool inSync() const { return true; }
protected:
search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument);
public:
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
index 70a63a4a4eb..52fd18cd971 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
@@ -23,31 +23,40 @@ namespace proton::lidspace {
namespace {
-class IncOnDestruct {
-public:
- IncOnDestruct(std::atomic<size_t> & count) : _count(count) {}
- ~IncOnDestruct() {
- _count.fetch_add(1, std::memory_order_relaxed);
- }
-private:
- std::atomic<size_t> & _count;
-};
-
bool
-isSameDocument(const search::DocumentMetaData & a, const search::DocumentMetaData & b) {
+isSameDocument(const search::DocumentMetaData &a, const search::DocumentMetaData &b) {
return (a.lid == b.lid) &&
(a.bucketId == b.bucketId) &&
(a.gid == b.gid) &&
- (a.timestamp == b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale.
+ (a.timestamp ==
+ b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale.
}
}
-void
-CompactionJob::failOperation() {
- IncOnDestruct countGuard(_executedCount);
- _master.execute(makeLambdaTask([this] { _scanItr.reset(); }));
-}
+class CompactionJob::MoveTask : public storage::spi::BucketTask {
+public:
+ MoveTask(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & meta, IDestructorCallback::SP opsTracker)
+ : _job(std::move(job)),
+ _meta(meta),
+ _opsTracker(std::move(opsTracker))
+ { }
+ void run(const Bucket & bucket, IDestructorCallback::SP onDone) override {
+ assert(bucket.getBucketId() == _meta.bucketId);
+ using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>;
+ CompactionJob::moveDocument(std::move(_job), _meta,
+ std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))));
+ }
+ void fail(const Bucket & bucket) override {
+ assert(bucket.getBucketId() == _meta.bucketId);
+ auto & master = _job->_master;
+ master.execute(makeLambdaTask([job=std::move(_job)] { job->_scanItr.reset(); }));
+ }
+private:
+ std::shared_ptr<CompactionJob> _job;
+ const search::DocumentMetaData _meta;
+ IDestructorCallback::SP _opsTracker;
+};
bool
CompactionJob::scanDocuments(const LidUsageStats &stats)
@@ -56,16 +65,7 @@ CompactionJob::scanDocuments(const LidUsageStats &stats)
DocumentMetaData document = getNextDocument(stats, false);
if (document.valid()) {
Bucket metaBucket(document::Bucket(_bucketSpace, document.bucketId));
- IDestructorCallback::SP context = getLimiter().beginOperation();
- auto bucketTask = makeBucketTask([this, meta=document, opsTracker=std::move(context)]
- (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) {
- assert(bucket.getBucketId() == meta.bucketId);
- using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>;
- moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone))));
- }, [this](const Bucket &) { failOperation(); });
-
- _startedCount.fetch_add(1, std::memory_order_relaxed);
- _bucketExecutor.execute(metaBucket, std::move(bucketTask));
+ _bucketExecutor.execute(metaBucket, std::make_unique<MoveTask>(shared_from_this(), document, getLimiter().beginOperation()));
if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
return true;
}
@@ -75,19 +75,20 @@ CompactionJob::scanDocuments(const LidUsageStats &stats)
}
void
-CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> context) {
- IncOnDestruct countGuard(_executedCount);
- if (_stopped.load(std::memory_order_relaxed)) return;
+CompactionJob::moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen,
+ std::shared_ptr<IDestructorCallback> context)
+{
// The real lid must be sampled in the master thread.
//TODO remove target lid from createMoveOperation interface
- auto op = _handler->createMoveOperation(metaThen, 0);
+ auto op = job->_handler->createMoveOperation(metaThen, 0);
if (!op || !op->getDocument()) return;
// Early detection and force md5 calculation outside of master thread
if (metaThen.gid != op->getDocument()->getId().getGlobalId()) return;
- _master.execute(makeLambdaTask([this, meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable {
- if (_stopped.load(std::memory_order_relaxed)) return;
- completeMove(meta, std::move(moveOp), std::move(onDone));
+ auto & master = job->_master;
+ master.execute(makeLambdaTask([self=std::move(job), meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable {
+ if (self->_stopped.load(std::memory_order_relaxed)) return;
+ self->completeMove(meta, std::move(moveOp), std::move(onDone));
}));
}
@@ -125,24 +126,14 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
_master(master),
_bucketExecutor(bucketExecutor),
_bucketSpace(bucketSpace),
- _stopped(false),
- _startedCount(0),
- _executedCount(0)
+ _stopped(false)
{ }
CompactionJob::~CompactionJob() = default;
-bool
-CompactionJob::inSync() const {
- return _executedCount == _startedCount;
-}
-
void
CompactionJob::onStop() {
_stopped = true;
- while ( ! inSync() ) {
- std::this_thread::sleep_for(1ms);
- }
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
index 21445de8d85..bbf39e5adf6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
@@ -31,16 +31,14 @@ private:
BucketExecutor &_bucketExecutor;
document::BucketSpace _bucketSpace;
std::atomic<bool> _stopped;
- std::atomic<size_t> _startedCount;
- std::atomic<size_t> _executedCount;
bool scanDocuments(const search::LidUsageStats &stats) override;
- void moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> onDone);
+ static void moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen,
+ std::shared_ptr<IDestructorCallback> onDone);
void completeMove(const search::DocumentMetaData & metaThen, std::unique_ptr<MoveOperation> moveOp,
std::shared_ptr<IDestructorCallback> onDone);
void onStop() override;
- bool inSync() const override;
- void failOperation();
+ class MoveTask;
public:
CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
std::shared_ptr<ILidSpaceCompactionHandler> handler,