diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-05-04 06:59:07 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-05-04 07:31:15 +0000 |
commit | 3f41990b0d2a35633e5295e1e3ab08e7310a2109 (patch) | |
tree | 7f7872805b4e950c8541818ea8bd6209669f6e56 /searchcore | |
parent | 88909a4ed21d35ffb994713c284b3dbfba006654 (diff) |
Add prune job that uses bucket executor.
Diffstat (limited to 'searchcore')
12 files changed, 267 insertions, 47 deletions
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index f401d23b8ad..49236a829f5 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -405,7 +405,7 @@ public: void removeDocs(const test::UserDocuments &docs, Timestamp timestamp); void - setPruneConfig(const DocumentDBPruneRemovedDocumentsConfig &pruneConfig) + setPruneConfig(const DocumentDBPruneConfig &pruneConfig) { auto newCfg = std::make_shared<DocumentDBMaintenanceConfig>( pruneConfig, @@ -954,7 +954,7 @@ TEST_F("require that document pruner is active", MaintenanceControllerFixture) EXPECT_EQUAL(10u, f._removed.getNumUsedLids()); EXPECT_EQUAL(10u, f._removed.getDocumentCount()); MyFrozenBucket::UP frozen3(new MyFrozenBucket(f._mc, bucketId3)); - f.setPruneConfig(DocumentDBPruneRemovedDocumentsConfig(200ms, 900s)); + f.setPruneConfig(DocumentDBPruneConfig(200ms, 900s, false)); for (uint32_t i = 0; i < 6; ++i) { std::this_thread::sleep_for(100ms); ASSERT_TRUE(f._executor.waitIdle(TIMEOUT)); @@ -974,6 +974,43 @@ TEST_F("require that document pruner is active", MaintenanceControllerFixture) EXPECT_EQUAL(5u, f._removed.getDocumentCount()); } +TEST_F("require that document pruner v2 is active", MaintenanceControllerFixture) +{ + uint64_t tshz = 1000000; + uint64_t now = static_cast<uint64_t>(time(nullptr)) * tshz; + Timestamp remTime(static_cast<Timestamp::Type>(now - 3600 * tshz)); + Timestamp keepTime(static_cast<Timestamp::Type>(now + 3600 * tshz)); + f._builder.createDocs(1, 1, 4); // 3 docs + f._builder.createDocs(2, 4, 6); // 2 docs + test::UserDocuments keepDocs(f._builder.getDocs()); + f.removeDocs(keepDocs, keepTime); + f._builder.clearDocs(); + f._builder.createDocs(3, 6, 8); // 2 docs + f._builder.createDocs(4, 8, 11); // 3 docs + test::UserDocuments removeDocs(f._builder.getDocs()); + BucketId bucketId3(removeDocs.getBucket(3)); + f.removeDocs(removeDocs, remTime); + f.notifyClusterStateChanged(); + EXPECT_TRUE(f._executor.isIdle()); + EXPECT_EQUAL(10u, f._removed.getNumUsedLids()); + EXPECT_EQUAL(10u, f._removed.getDocumentCount()); + f.startMaintenance(); + ASSERT_TRUE(f._executor.waitIdle(TIMEOUT)); + EXPECT_EQUAL(10u, f._removed.getNumUsedLids()); + EXPECT_EQUAL(10u, f._removed.getDocumentCount()); + f.setPruneConfig(DocumentDBPruneConfig(200ms, 900s, true)); + for (uint32_t i = 0; i < 600; ++i) { + std::this_thread::sleep_for(100ms); + ASSERT_TRUE(f._executor.waitIdle(TIMEOUT)); + if (f._removed.getNumUsedLids() != 10u) + break; + } + f._bucketExecutor.sync(); + f._executor.sync(); + EXPECT_EQUAL(5u, f._removed.getNumUsedLids()); + EXPECT_EQUAL(5u, f._removed.getDocumentCount()); +} + TEST_F("require that heartbeats are scheduled", MaintenanceControllerFixture) { f.notifyClusterStateChanged(); @@ -1189,7 +1226,7 @@ TEST_F("require that maintenance jobs are run by correct executor", MaintenanceC void assertPruneRemovedDocumentsConfig(vespalib::duration expDelay, vespalib::duration expInterval, vespalib::duration interval, MaintenanceControllerFixture &f) { - f.setPruneConfig(DocumentDBPruneRemovedDocumentsConfig(interval, 1000s)); + f.setPruneConfig(DocumentDBPruneConfig(interval, 1000s, true)); const auto *job = findJob(f._mc.getJobList(), "prune_removed_documents.searchdocument"); EXPECT_EQUAL(expDelay, job->getJob().getDelay()); EXPECT_EQUAL(expInterval, job->getJob().getInterval()); diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 050930352b7..835ac5ae286 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -352,6 +352,9 @@ pruneremoveddocumentsinterval double default=0.0 ## Default value is 2 weeks (1209600 seconds). pruneremoveddocumentsage double default=1209600.0 +## Set to true to enable bucket locking via content layer +pruneremoveddocuments.usebucketexecutor bool default=true + ## Minimum size of packets to compress (0 means no compression) ## packetcompresslimit int default=1024 diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h index 1f813c641b9..dc10812062e 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h @@ -15,36 +15,22 @@ public: typedef std::unique_ptr<PruneRemovedDocumentsOperation> UP; PruneRemovedDocumentsOperation(); - - PruneRemovedDocumentsOperation(search::DocumentIdT docIdLimit, - uint32_t subDbId); - - virtual - ~PruneRemovedDocumentsOperation() - { - } + PruneRemovedDocumentsOperation(search::DocumentIdT docIdLimit, uint32_t subDbId); uint32_t getSubDbId() const { return _subDbId; } - void setLidsToRemove(const LidVectorContext::SP &lidsToRemove) - { + void setLidsToRemove(const LidVectorContext::SP &lidsToRemove) { RemoveDocumentsOperation::setLidsToRemove(_subDbId, lidsToRemove); } const LidVectorContext::SP - getLidsToRemove() const - { + getLidsToRemove() const { return RemoveDocumentsOperation::getLidsToRemove(_subDbId); } - virtual void - serialize(vespalib::nbostream &os) const override; - - virtual void - deserialize(vespalib::nbostream &is, - const document::DocumentTypeRepo &repo) override; - - virtual vespalib::string toString() const override; + void serialize(vespalib::nbostream &os) const override; + void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) override; + vespalib::string toString() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 9d359590912..78e04c066f3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -82,6 +82,7 @@ vespa_add_library(searchcore_server STATIC proton_thread_pools_explorer.cpp prune_session_cache_job.cpp pruneremoveddocumentsjob.cpp + pruneremoveddocumentsjob_v2.cpp putdonecontext.cpp reconfig_params.cpp remove_operations_rate_tracker.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp index f3b0c3ff305..10979ebc6e2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp @@ -7,26 +7,26 @@ namespace proton { constexpr vespalib::duration MAX_DELAY_SEC = 300s; DocumentDBPruneConfig::DocumentDBPruneConfig() noexcept - : _delay(MAX_DELAY_SEC), - _interval(21600s), - _age(1209600s) + : DocumentDBPruneConfig(21600s, 1209600s, false) { } DocumentDBPruneConfig:: -DocumentDBPruneConfig(vespalib::duration interval, vespalib::duration age) noexcept +DocumentDBPruneConfig(vespalib::duration interval, vespalib::duration age, bool useBucketExecutor) noexcept : _delay(std::min(MAX_DELAY_SEC, interval)), _interval(interval), - _age(age) + _age(age), + _useBucketExecutor(useBucketExecutor) { } bool DocumentDBPruneConfig::operator==(const DocumentDBPruneConfig &rhs) const noexcept { - return _delay == rhs._delay && - _interval == rhs._interval && - _age == rhs._age; + return (_delay == rhs._delay) && + (_interval == rhs._interval) && + (_age == rhs._age) && + (_useBucketExecutor == rhs._useBucketExecutor); } DocumentDBHeartBeatConfig::DocumentDBHeartBeatConfig() noexcept @@ -144,7 +144,7 @@ DocumentDBMaintenanceConfig::DocumentDBMaintenanceConfig() noexcept DocumentDBMaintenanceConfig::~DocumentDBMaintenanceConfig() = default; DocumentDBMaintenanceConfig:: -DocumentDBMaintenanceConfig(const DocumentDBPruneRemovedDocumentsConfig &pruneRemovedDocuments, +DocumentDBMaintenanceConfig(const DocumentDBPruneConfig &pruneRemovedDocuments, const DocumentDBHeartBeatConfig &heartBeat, vespalib::duration groupingSessionPruneInterval, vespalib::duration visibilityDelay, diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h index af8194a4a19..ed6494c117d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h @@ -15,18 +15,20 @@ private: vespalib::duration _delay; vespalib::duration _interval; vespalib::duration _age; + bool _useBucketExecutor; public: DocumentDBPruneConfig() noexcept; - DocumentDBPruneConfig(vespalib::duration interval, vespalib::duration age) noexcept; + DocumentDBPruneConfig(vespalib::duration interval, vespalib::duration age, bool useBucketExecutor) noexcept; bool operator==(const DocumentDBPruneConfig &rhs) const noexcept; vespalib::duration getDelay() const noexcept { return _delay; } vespalib::duration getInterval() const noexcept { return _interval; } vespalib::duration getAge() const noexcept { return _age; } + bool useBucketExecutor() const noexcept { return _useBucketExecutor; } }; -typedef DocumentDBPruneConfig DocumentDBPruneRemovedDocumentsConfig; +using DocumentDBPruneRemovedDocumentsConfig = DocumentDBPruneConfig; class DocumentDBHeartBeatConfig { @@ -107,7 +109,7 @@ public: typedef std::shared_ptr<DocumentDBMaintenanceConfig> SP; private: - DocumentDBPruneRemovedDocumentsConfig _pruneRemovedDocuments; + DocumentDBPruneConfig _pruneRemovedDocuments; DocumentDBHeartBeatConfig _heartBeat; vespalib::duration _sessionCachePruneInterval; vespalib::duration _visibilityDelay; @@ -120,7 +122,7 @@ private: public: DocumentDBMaintenanceConfig() noexcept; - DocumentDBMaintenanceConfig(const DocumentDBPruneRemovedDocumentsConfig &pruneRemovedDocuments, + DocumentDBMaintenanceConfig(const DocumentDBPruneConfig &pruneRemovedDocuments, const DocumentDBHeartBeatConfig &heartBeat, vespalib::duration sessionCachePruneInterval, vespalib::duration visibilityDelay, @@ -139,7 +141,7 @@ public: bool operator==(const DocumentDBMaintenanceConfig &rhs) const noexcept ; - const DocumentDBPruneRemovedDocumentsConfig &getPruneRemovedDocumentsConfig() const noexcept { + const DocumentDBPruneConfig &getPruneRemovedDocumentsConfig() const noexcept { return _pruneRemovedDocuments; } const DocumentDBHeartBeatConfig &getHeartBeatConfig() const noexcept { diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index fbdff698b9b..feab911441a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -132,9 +132,9 @@ buildMaintenanceConfig(const BootstrapConfig::SP &bootstrapConfig, isDocumentTypeGlobal = ddbConfig.global; } return std::make_shared<DocumentDBMaintenanceConfig>( - DocumentDBPruneRemovedDocumentsConfig( - pruneRemovedDocumentsInterval, - pruneRemovedDocumentsAge), + DocumentDBPruneConfig(pruneRemovedDocumentsInterval, + pruneRemovedDocumentsAge, + proton.pruneremoveddocuments.usebucketexecutor), DocumentDBHeartBeatConfig(), vespalib::from_s(proton.grouping.sessionmanager.pruning.interval), visibilityDelay, diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index 0dc3405d5ce..8f69fe84b3d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -8,6 +8,7 @@ #include "maintenance_jobs_injector.h" #include "prune_session_cache_job.h" #include "pruneremoveddocumentsjob.h" +#include "pruneremoveddocumentsjob_v2.h" #include "sample_attribute_usage_job.h" #include <vespa/searchcore/proton/attribute/attribute_config_inspector.h> @@ -97,9 +98,20 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, const auto & docTypeName = controller.getDocTypeName().getName(); const MaintenanceDocumentSubDB &mRemSubDB(controller.getRemSubDB()); - auto pruneRDjob = std::make_unique<PruneRemovedDocumentsJob>(config.getPruneRemovedDocumentsConfig(), *mRemSubDB.meta_store(), - mRemSubDB.sub_db_id(), docTypeName, prdHandler, fbHandler); - controller.registerJobInMasterThread(trackJob(jobTrackers.getRemovedDocumentsPrune(), std::move(pruneRDjob))); + if (config.getPruneRemovedDocumentsConfig().useBucketExecutor()) { + controller.registerJobInMasterThread( + trackJob(jobTrackers.getRemovedDocumentsPrune(), + PruneRemovedDocumentsJobV2::create(config.getPruneRemovedDocumentsConfig(), controller.retainDB(), + *mRemSubDB.meta_store(), mRemSubDB.sub_db_id(), bucketSpace, + docTypeName, prdHandler, controller.masterThread(), + bucketExecutor))); + } else { + controller.registerJobInMasterThread( + trackJob(jobTrackers.getRemovedDocumentsPrune(), + std::make_unique<PruneRemovedDocumentsJob>(config.getPruneRemovedDocumentsConfig(), + *mRemSubDB.meta_store(), mRemSubDB.sub_db_id(), + docTypeName, prdHandler, fbHandler))); + } if (!config.getLidSpaceCompactionConfig().isDisabled()) { ILidSpaceCompactionHandler::Vector lidSpaceCompactionHandlers; diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp index dfb84af5da5..d8c4b6d1f71 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.cpp @@ -16,7 +16,7 @@ using storage::spi::Timestamp; namespace proton { PruneRemovedDocumentsJob:: -PruneRemovedDocumentsJob(const Config &config, +PruneRemovedDocumentsJob(const DocumentDBPruneConfig &config, const IDocumentMetaStore &metaStore, uint32_t subDbId, const vespalib::string &docTypeName, diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h index 76967635f4a..0f756317ab4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob.h @@ -31,9 +31,7 @@ private: void flush(DocId lowLid, DocId nextLowLid, const storage::spi::Timestamp ageLimit); public: - using Config = DocumentDBPruneRemovedDocumentsConfig; - - PruneRemovedDocumentsJob(const Config &config, + PruneRemovedDocumentsJob(const DocumentDBPruneConfig &config, const IDocumentMetaStore &metaStore, uint32_t subDbId, const vespalib::string &docTypeName, diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp new file mode 100644 index 00000000000..f2cb6f3c270 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp @@ -0,0 +1,113 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "pruneremoveddocumentsjob_v2.h" +#include "ipruneremoveddocumentshandler.h" +#include <vespa/persistence/spi/bucket_tasks.h> +#include <vespa/searchcore/proton/feedoperation/pruneremoveddocumentsoperation.h> +#include <vespa/searchcore/proton/documentmetastore/i_document_meta_store.h> +#include <vespa/searchcorespi/index/i_thread_service.h> +#include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/lambdatask.h> + +#include <vespa/log/log.h> +LOG_SETUP(".proton.server.pruneremoveddocumentsjob"); + +using document::BucketId; +using storage::spi::Timestamp; +using storage::spi::Bucket; +using vespalib::IDestructorCallback; +using vespalib::makeLambdaTask; + +namespace proton { + +PruneRemovedDocumentsJobV2:: +PruneRemovedDocumentsJobV2(const DocumentDBPruneConfig &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, + uint32_t subDbId, document::BucketSpace bucketSpace, const vespalib::string &docTypeName, + IPruneRemovedDocumentsHandler &handler, IThreadService & master, + BucketExecutor & bucketExecutor) + : BlockableMaintenanceJob("prune_removed_documents." + docTypeName, + config.getDelay(), config.getInterval()), + _metaStore(metaStore), + _handler(handler), + _master(master), + _bucketExecutor(bucketExecutor), + _docTypeName(docTypeName), + _dbRetainer(std::move(dbRetainer)), + _cfgAgeLimit(config.getAge()), + _subDbId(subDbId), + _bucketSpace(bucketSpace), + _stopped(false), + _nextLid(1u) +{ +} + +class PruneRemovedDocumentsJobV2::PruneTask : public storage::spi::BucketTask { +public: + PruneTask(std::shared_ptr<PruneRemovedDocumentsJobV2> job, uint32_t lid, const RawDocumentMetaData & meta, IDestructorCallback::SP opsTracker) + : _job(std::move(job)), + _lid(lid), + _meta(meta), + _opsTracker(std::move(opsTracker)) + { } + void run(const Bucket & bucket, IDestructorCallback::SP onDone) override; + void fail(const Bucket & bucket) override { + assert(bucket.getBucketId() == _meta.getBucketId()); + } +private: + std::shared_ptr<PruneRemovedDocumentsJobV2> _job; + uint32_t _lid; + const RawDocumentMetaData _meta; + IDestructorCallback::SP _opsTracker; +}; + +void +PruneRemovedDocumentsJobV2::PruneTask::run(const Bucket & bucket, IDestructorCallback::SP onDone) { + assert(bucket.getBucketId() == _meta.getBucketId()); + using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallback::SP, IDestructorCallback::SP>>; + auto & job = *_job; + job._master.execute(makeLambdaTask([job = std::move(_job), lid=_lid, meta = _meta, + onDone = std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone))) + ]() { + (void) onDone; + job->remove(lid, meta); + })); +} + +void +PruneRemovedDocumentsJobV2::remove(uint32_t lid, const RawDocumentMetaData & oldMeta) { + if (_stopped.load(std::memory_order_relaxed)) return; + if ( ! _metaStore.validLid(lid)) return; + const RawDocumentMetaData &meta = _metaStore.getRawMetaData(lid); + if (meta.getBucketId() != oldMeta.getBucketId()) return; + if (meta.getTimestamp() != oldMeta.getTimestamp()) return; + if (meta.getGid() != oldMeta.getGid()) return; + + PruneRemovedDocumentsOperation pruneOp(_metaStore.getCommittedDocIdLimit(), _subDbId); + pruneOp.getLidsToRemove()->addLid(lid); + _handler.performPruneRemovedDocuments(pruneOp); +} + +bool +PruneRemovedDocumentsJobV2::run() +{ + vespalib::system_time now = vespalib::system_clock::now(); + const Timestamp ageLimit(static_cast<Timestamp::Type> + (vespalib::count_us(now.time_since_epoch() - _cfgAgeLimit))); + const DocId docIdLimit(_metaStore.getCommittedDocIdLimit()); + const DocId lidLimit = std::min(_nextLid + 1000000u, docIdLimit); + for (uint32_t removed = 0; removed < 1000 && _nextLid < lidLimit; _nextLid++) { + if ( ! _metaStore.validLid(_nextLid)) continue; + const RawDocumentMetaData &meta = _metaStore.getRawMetaData(_nextLid); + if (meta.getTimestamp() >= ageLimit) continue; + + _bucketExecutor.execute(Bucket(document::Bucket(_bucketSpace, meta.getBucketId())), + std::make_unique<PruneTask>(shared_from_this(), _nextLid, meta, getLimiter().beginOperation())); + } + if (_nextLid >= docIdLimit) { + _nextLid = 1u; + return true; + } + return false; +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h new file mode 100644 index 00000000000..98330c75d5c --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h @@ -0,0 +1,68 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "blockable_maintenance_job.h" +#include "document_db_maintenance_config.h" +#include <vespa/searchcore/proton/common/monitored_refcount.h> +#include <persistence/spi/types.h> +#include <vespa/document/bucket/bucketspace.h> +#include <atomic> + +namespace storage::spi { struct BucketExecutor; } +namespace searchcorespi::index { struct IThreadService; } + +namespace proton { + +struct IDocumentMetaStore; +class IPruneRemovedDocumentsHandler; +class RawDocumentMetaData; + +/** + * Job that regularly checks whether old removed documents should be + * forgotten. + */ +class PruneRemovedDocumentsJobV2 : public BlockableMaintenanceJob, + public std::enable_shared_from_this<PruneRemovedDocumentsJobV2> +{ +private: + class PruneTask; + using Config = DocumentDBPruneRemovedDocumentsConfig; + using BucketExecutor = storage::spi::BucketExecutor; + using IThreadService = searchcorespi::index::IThreadService; + using DocId = uint32_t; + + const IDocumentMetaStore &_metaStore; // external ownership + IPruneRemovedDocumentsHandler &_handler; + IThreadService &_master; + BucketExecutor &_bucketExecutor; + const vespalib::string _docTypeName; + RetainGuard _dbRetainer; + const vespalib::duration _cfgAgeLimit; + const uint32_t _subDbId; + const document::BucketSpace _bucketSpace; + std::atomic<bool> _stopped; + + DocId _nextLid; + + void remove(uint32_t lid, const RawDocumentMetaData & meta); + + PruneRemovedDocumentsJobV2(const DocumentDBPruneConfig &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, + uint32_t subDbId, document::BucketSpace bucketSpace, const vespalib::string &docTypeName, + IPruneRemovedDocumentsHandler &handler, IThreadService & master, + BucketExecutor & bucketExecutor); + bool run() override; + void onStop() override { _stopped = true; } +public: + static std::shared_ptr<PruneRemovedDocumentsJobV2> + create(const Config &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId, + document::BucketSpace bucketSpace, const vespalib::string &docTypeName, + IPruneRemovedDocumentsHandler &handler, IThreadService & master, BucketExecutor & bucketExecutor) + { + return std::shared_ptr<PruneRemovedDocumentsJobV2>( + new PruneRemovedDocumentsJobV2(config, std::move(dbRetainer), metaStore, subDbId, bucketSpace, + docTypeName, handler, master, bucketExecutor)); + } +}; + +} // namespace proton + |