From 1d7dd8e92608580a10ac1056e6590408db4602e5 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 9 Apr 2021 21:57:23 +0000 Subject: Use shared_ptr to lid space compaction job in order to ensure correct lifetime, and to avoid having to wait for pending operations when reconfiguring or other events requiring move jobs to be stopped. --- .../lid_space_compaction/lid_space_jobtest.cpp | 4 +- .../lid_space_compaction/lid_space_jobtest.h | 2 +- .../server/lid_space_compaction_job_base.cpp | 3 - .../proton/server/lid_space_compaction_job_base.h | 1 - .../server/lid_space_compaction_job_take2.cpp | 81 ++++++++++------------ .../proton/server/lid_space_compaction_job_take2.h | 8 +-- 6 files changed, 42 insertions(+), 57 deletions(-) (limited to 'searchcore') diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp index a82437012c2..683709317c2 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp @@ -56,11 +56,11 @@ JobTestBase::init(uint32_t allowedLidBloat, _singleExecutor = std::make_unique(1, 0x10000); _master = std::make_unique (*_singleExecutor); _bucketExecutor = std::make_unique(4); - _job = std::make_unique(compactCfg, _handler, _storer, *_master, *_bucketExecutor, + _job = std::make_shared(compactCfg, _handler, _storer, *_master, *_bucketExecutor, _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired, document::BucketSpace::placeHolder()); } else { - _job = std::make_unique(compactCfg, _handler, _storer, _frozenHandler, _diskMemUsageNotifier, + _job = std::make_shared(compactCfg, _handler, _storer, _frozenHandler, _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired); } } diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h index a78ad66592f..0a4e5c56acb 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h @@ -16,7 +16,7 @@ struct JobTestBase : public ::testing::TestWithParam { MyFrozenBucketHandler _frozenHandler; test::DiskMemUsageNotifier _diskMemUsageNotifier; test::ClusterStateHandler _clusterStateHandler; - std::unique_ptr _job; + std::shared_ptr _job; JobTestBase(); ~JobTestBase() override; void init(uint32_t allowedLidBloat, diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp index 99efc3d4e41..225d128f9bf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp @@ -130,9 +130,6 @@ LidSpaceCompactionJobBase::run() } if (_scanItr && !_scanItr->valid()) { - if (!inSync()) { - return false; - } if (shouldRestartScanDocuments(_handler->getLidStatus())) { _scanItr = _handler->getIterator(); } else { diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h index 774d82a5d1d..d127025c496 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h @@ -50,7 +50,6 @@ private: void compactLidSpace(const search::LidUsageStats &stats); bool remove_batch_is_ongoing() const; bool remove_is_ongoing() const; - virtual bool inSync() const { return true; } protected: search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument); public: diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp index 70a63a4a4eb..52fd18cd971 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -23,31 +23,40 @@ namespace proton::lidspace { namespace { -class IncOnDestruct { -public: - IncOnDestruct(std::atomic & count) : _count(count) {} - ~IncOnDestruct() { - _count.fetch_add(1, std::memory_order_relaxed); - } -private: - std::atomic & _count; -}; - bool -isSameDocument(const search::DocumentMetaData & a, const search::DocumentMetaData & b) { +isSameDocument(const search::DocumentMetaData &a, const search::DocumentMetaData &b) { return (a.lid == b.lid) && (a.bucketId == b.bucketId) && (a.gid == b.gid) && - (a.timestamp == b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale. + (a.timestamp == + b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale. } } -void -CompactionJob::failOperation() { - IncOnDestruct countGuard(_executedCount); - _master.execute(makeLambdaTask([this] { _scanItr.reset(); })); -} +class CompactionJob::MoveTask : public storage::spi::BucketTask { +public: + MoveTask(std::shared_ptr job, const search::DocumentMetaData & meta, IDestructorCallback::SP opsTracker) + : _job(std::move(job)), + _meta(meta), + _opsTracker(std::move(opsTracker)) + { } + void run(const Bucket & bucket, IDestructorCallback::SP onDone) override { + assert(bucket.getBucketId() == _meta.bucketId); + using DoneContext = vespalib::KeepAlive>; + CompactionJob::moveDocument(std::move(_job), _meta, + std::make_shared(std::make_pair(std::move(_opsTracker), std::move(onDone)))); + } + void fail(const Bucket & bucket) override { + assert(bucket.getBucketId() == _meta.bucketId); + auto & master = _job->_master; + master.execute(makeLambdaTask([job=std::move(_job)] { job->_scanItr.reset(); })); + } +private: + std::shared_ptr _job; + const search::DocumentMetaData _meta; + IDestructorCallback::SP _opsTracker; +}; bool CompactionJob::scanDocuments(const LidUsageStats &stats) @@ -56,16 +65,7 @@ CompactionJob::scanDocuments(const LidUsageStats &stats) DocumentMetaData document = getNextDocument(stats, false); if (document.valid()) { Bucket metaBucket(document::Bucket(_bucketSpace, document.bucketId)); - IDestructorCallback::SP context = getLimiter().beginOperation(); - auto bucketTask = makeBucketTask([this, meta=document, opsTracker=std::move(context)] - (const Bucket & bucket, std::shared_ptr onDone) { - assert(bucket.getBucketId() == meta.bucketId); - using DoneContext = vespalib::KeepAlive>; - moveDocument(meta, std::make_shared(std::make_pair(std::move(opsTracker), std::move(onDone)))); - }, [this](const Bucket &) { failOperation(); }); - - _startedCount.fetch_add(1, std::memory_order_relaxed); - _bucketExecutor.execute(metaBucket, std::move(bucketTask)); + _bucketExecutor.execute(metaBucket, std::make_unique(shared_from_this(), document, getLimiter().beginOperation())); if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { return true; } @@ -75,19 +75,20 @@ CompactionJob::scanDocuments(const LidUsageStats &stats) } void -CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr context) { - IncOnDestruct countGuard(_executedCount); - if (_stopped.load(std::memory_order_relaxed)) return; +CompactionJob::moveDocument(std::shared_ptr job, const search::DocumentMetaData & metaThen, + std::shared_ptr context) +{ // The real lid must be sampled in the master thread. //TODO remove target lid from createMoveOperation interface - auto op = _handler->createMoveOperation(metaThen, 0); + auto op = job->_handler->createMoveOperation(metaThen, 0); if (!op || !op->getDocument()) return; // Early detection and force md5 calculation outside of master thread if (metaThen.gid != op->getDocument()->getId().getGlobalId()) return; - _master.execute(makeLambdaTask([this, meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable { - if (_stopped.load(std::memory_order_relaxed)) return; - completeMove(meta, std::move(moveOp), std::move(onDone)); + auto & master = job->_master; + master.execute(makeLambdaTask([self=std::move(job), meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable { + if (self->_stopped.load(std::memory_order_relaxed)) return; + self->completeMove(meta, std::move(moveOp), std::move(onDone)); })); } @@ -125,24 +126,14 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, _master(master), _bucketExecutor(bucketExecutor), _bucketSpace(bucketSpace), - _stopped(false), - _startedCount(0), - _executedCount(0) + _stopped(false) { } CompactionJob::~CompactionJob() = default; -bool -CompactionJob::inSync() const { - return _executedCount == _startedCount; -} - void CompactionJob::onStop() { _stopped = true; - while ( ! inSync() ) { - std::this_thread::sleep_for(1ms); - } } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h index 21445de8d85..bbf39e5adf6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -31,16 +31,14 @@ private: BucketExecutor &_bucketExecutor; document::BucketSpace _bucketSpace; std::atomic _stopped; - std::atomic _startedCount; - std::atomic _executedCount; bool scanDocuments(const search::LidUsageStats &stats) override; - void moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr onDone); + static void moveDocument(std::shared_ptr job, const search::DocumentMetaData & metaThen, + std::shared_ptr onDone); void completeMove(const search::DocumentMetaData & metaThen, std::unique_ptr moveOp, std::shared_ptr onDone); void onStop() override; - bool inSync() const override; - void failOperation(); + class MoveTask; public: CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, std::shared_ptr handler, -- cgit v1.2.3