diff options
Diffstat (limited to 'searchcore')
6 files changed, 42 insertions, 57 deletions
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp index a82437012c2..683709317c2 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp @@ -56,11 +56,11 @@ JobTestBase::init(uint32_t allowedLidBloat, _singleExecutor = std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); _master = std::make_unique<proton::ExecutorThreadService> (*_singleExecutor); _bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4); - _job = std::make_unique<lidspace::CompactionJob>(compactCfg, _handler, _storer, *_master, *_bucketExecutor, + _job = std::make_shared<lidspace::CompactionJob>(compactCfg, _handler, _storer, *_master, *_bucketExecutor, _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired, document::BucketSpace::placeHolder()); } else { - _job = std::make_unique<LidSpaceCompactionJob>(compactCfg, _handler, _storer, _frozenHandler, _diskMemUsageNotifier, + _job = std::make_shared<LidSpaceCompactionJob>(compactCfg, _handler, _storer, _frozenHandler, _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired); } } diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h index a78ad66592f..0a4e5c56acb 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h @@ -16,7 +16,7 @@ struct JobTestBase : public ::testing::TestWithParam<bool> { MyFrozenBucketHandler _frozenHandler; test::DiskMemUsageNotifier _diskMemUsageNotifier; test::ClusterStateHandler _clusterStateHandler; - std::unique_ptr<BlockableMaintenanceJob> _job; + std::shared_ptr<BlockableMaintenanceJob> _job; JobTestBase(); ~JobTestBase() override; void init(uint32_t allowedLidBloat, diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp index 99efc3d4e41..225d128f9bf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.cpp @@ -130,9 +130,6 @@ LidSpaceCompactionJobBase::run() } if (_scanItr && !_scanItr->valid()) { - if (!inSync()) { - return false; - } if (shouldRestartScanDocuments(_handler->getLidStatus())) { _scanItr = _handler->getIterator(); } else { diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h index 774d82a5d1d..d127025c496 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_base.h @@ -50,7 +50,6 @@ private: void compactLidSpace(const search::LidUsageStats &stats); bool remove_batch_is_ongoing() const; bool remove_is_ongoing() const; - virtual bool inSync() const { return true; } protected: search::DocumentMetaData getNextDocument(const search::LidUsageStats &stats, bool retryLastDocument); public: diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp index 70a63a4a4eb..52fd18cd971 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -23,31 +23,40 @@ namespace proton::lidspace { namespace { -class IncOnDestruct { -public: - IncOnDestruct(std::atomic<size_t> & count) : _count(count) {} - ~IncOnDestruct() { - _count.fetch_add(1, std::memory_order_relaxed); - } -private: - std::atomic<size_t> & _count; -}; - bool -isSameDocument(const search::DocumentMetaData & a, const search::DocumentMetaData & b) { +isSameDocument(const search::DocumentMetaData &a, const search::DocumentMetaData &b) { return (a.lid == b.lid) && (a.bucketId == b.bucketId) && (a.gid == b.gid) && - (a.timestamp == b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale. + (a.timestamp == + b.timestamp); // Timestamp check can be removed once logic has proved itself in large scale. } } -void -CompactionJob::failOperation() { - IncOnDestruct countGuard(_executedCount); - _master.execute(makeLambdaTask([this] { _scanItr.reset(); })); -} +class CompactionJob::MoveTask : public storage::spi::BucketTask { +public: + MoveTask(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & meta, IDestructorCallback::SP opsTracker) + : _job(std::move(job)), + _meta(meta), + _opsTracker(std::move(opsTracker)) + { } + void run(const Bucket & bucket, IDestructorCallback::SP onDone) override { + assert(bucket.getBucketId() == _meta.bucketId); + using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>; + CompactionJob::moveDocument(std::move(_job), _meta, + std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone)))); + } + void fail(const Bucket & bucket) override { + assert(bucket.getBucketId() == _meta.bucketId); + auto & master = _job->_master; + master.execute(makeLambdaTask([job=std::move(_job)] { job->_scanItr.reset(); })); + } +private: + std::shared_ptr<CompactionJob> _job; + const search::DocumentMetaData _meta; + IDestructorCallback::SP _opsTracker; +}; bool CompactionJob::scanDocuments(const LidUsageStats &stats) @@ -56,16 +65,7 @@ CompactionJob::scanDocuments(const LidUsageStats &stats) DocumentMetaData document = getNextDocument(stats, false); if (document.valid()) { Bucket metaBucket(document::Bucket(_bucketSpace, document.bucketId)); - IDestructorCallback::SP context = getLimiter().beginOperation(); - auto bucketTask = makeBucketTask([this, meta=document, opsTracker=std::move(context)] - (const Bucket & bucket, std::shared_ptr<IDestructorCallback> onDone) { - assert(bucket.getBucketId() == meta.bucketId); - using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>; - moveDocument(meta, std::make_shared<DoneContext>(std::make_pair(std::move(opsTracker), std::move(onDone)))); - }, [this](const Bucket &) { failOperation(); }); - - _startedCount.fetch_add(1, std::memory_order_relaxed); - _bucketExecutor.execute(metaBucket, std::move(bucketTask)); + _bucketExecutor.execute(metaBucket, std::make_unique<MoveTask>(shared_from_this(), document, getLimiter().beginOperation())); if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { return true; } @@ -75,19 +75,20 @@ CompactionJob::scanDocuments(const LidUsageStats &stats) } void -CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> context) { - IncOnDestruct countGuard(_executedCount); - if (_stopped.load(std::memory_order_relaxed)) return; +CompactionJob::moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen, + std::shared_ptr<IDestructorCallback> context) +{ // The real lid must be sampled in the master thread. //TODO remove target lid from createMoveOperation interface - auto op = _handler->createMoveOperation(metaThen, 0); + auto op = job->_handler->createMoveOperation(metaThen, 0); if (!op || !op->getDocument()) return; // Early detection and force md5 calculation outside of master thread if (metaThen.gid != op->getDocument()->getId().getGlobalId()) return; - _master.execute(makeLambdaTask([this, meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable { - if (_stopped.load(std::memory_order_relaxed)) return; - completeMove(meta, std::move(moveOp), std::move(onDone)); + auto & master = job->_master; + master.execute(makeLambdaTask([self=std::move(job), meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable { + if (self->_stopped.load(std::memory_order_relaxed)) return; + self->completeMove(meta, std::move(moveOp), std::move(onDone)); })); } @@ -125,24 +126,14 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, _master(master), _bucketExecutor(bucketExecutor), _bucketSpace(bucketSpace), - _stopped(false), - _startedCount(0), - _executedCount(0) + _stopped(false) { } CompactionJob::~CompactionJob() = default; -bool -CompactionJob::inSync() const { - return _executedCount == _startedCount; -} - void CompactionJob::onStop() { _stopped = true; - while ( ! inSync() ) { - std::this_thread::sleep_for(1ms); - } } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h index 21445de8d85..bbf39e5adf6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -31,16 +31,14 @@ private: BucketExecutor &_bucketExecutor; document::BucketSpace _bucketSpace; std::atomic<bool> _stopped; - std::atomic<size_t> _startedCount; - std::atomic<size_t> _executedCount; bool scanDocuments(const search::LidUsageStats &stats) override; - void moveDocument(const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> onDone); + static void moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen, + std::shared_ptr<IDestructorCallback> onDone); void completeMove(const search::DocumentMetaData & metaThen, std::unique_ptr<MoveOperation> moveOp, std::shared_ptr<IDestructorCallback> onDone); void onStop() override; - bool inSync() const override; - void failOperation(); + class MoveTask; public: CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, std::shared_ptr<ILidSpaceCompactionHandler> handler, |