diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-23 22:26:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-23 22:26:52 +0100 |
commit | f184afbbbb011782776624f50eb5add144d5dbd2 (patch) | |
tree | b4990e905607d28e4a0b73e06bc4bb05a5265943 | |
parent | 0d43351fa8c26eb7d319c0da6a7867d431f0c50e (diff) | |
parent | 1c04490eab735966b2581ef526177917613f9643 (diff) |
Merge pull request #16185 from vespa-engine/revert-16184-revert-16183-balder/wire-in-bucket-executor
Revert "Revert "Wire in the BucketExecutor.""
18 files changed, 100 insertions, 43 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index da194d1ff7b..6b4061081ea 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -9,6 +9,7 @@ #include <vespa/document/base/testdocman.h> #include <vespa/fastos/file.h> #include <vespa/persistence/conformancetest/conformancetest.h> +#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/searchcommon/common/schemaconfigurer.h> @@ -168,6 +169,7 @@ private: mutable DummyWireService _metricsWireService; mutable MemoryConfigStores _config_stores; vespalib::ThreadStackExecutor _summaryExecutor; + storage::spi::dummy::DummyBucketExecutor _bucketExecutor; public: DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort); @@ -206,6 +208,7 @@ public: const_cast<DocumentDBFactory &>(*this), _summaryExecutor, _summaryExecutor, + _bucketExecutor, _tls, _metricsWireService, _fileHeaderContext, @@ -225,7 +228,8 @@ DocumentDBFactory::DocumentDBFactory(const vespalib::string &baseDir, int tlsLis _queryLimiter(), _clock(), _metricsWireService(), - _summaryExecutor(8, 128 * 1024) + _summaryExecutor(8, 128 * 1024), + _bucketExecutor(2) {} DocumentDBFactory::~DocumentDBFactory() = default; diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 861b2ad5307..90942e9aef4 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -805,6 +805,7 @@ PersistenceProviderFixture::create_document_db(const BMParams & params) _document_db_owner, _summary_executor, _summary_executor, + *_persistence_engine, _tls, _metrics_wire_service, _file_header_context, diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index e84cf338bcd..38e5e08ec02 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -25,6 +25,7 @@ #include <vespa/searchcore/proton/server/searchview.h> #include <vespa/searchcore/proton/server/summaryadapter.h> #include <vespa/searchcore/proton/matching/querylimiter.h> +#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> #include <vespa/vespalib/util/destructor_callbacks.h> #include <vespa/searchlib/engine/docsumapi.h> #include <vespa/searchlib/index/docbuilder.h> @@ -173,6 +174,7 @@ public: DummyFileHeaderContext _fileHeaderContext; TransLogServer _tls; vespalib::ThreadStackExecutor _summaryExecutor; + storage::spi::dummy::DummyBucketExecutor _bucketExecutor; bool _mkdirOk; matching::QueryLimiter _queryLimiter; vespalib::Clock _clock; @@ -192,6 +194,7 @@ public: _fileHeaderContext(), _tls("tmp", 9013, ".", _fileHeaderContext), _summaryExecutor(8, 128*1024), + _bucketExecutor(2), _mkdirOk(FastOS_File::MakeDirectory("tmpdb")), _queryLimiter(), _clock(), @@ -219,7 +222,7 @@ public: } _ddb = std::make_unique<DocumentDB>("tmpdb", _configMgr.getConfig(), "tcp/localhost:9013", _queryLimiter, _clock, DocTypeName(docTypeName), makeBucketSpace(), *b->getProtonConfigSP(), *this, - _summaryExecutor, _summaryExecutor, _tls, _dummy, _fileHeaderContext, + _summaryExecutor, _summaryExecutor, _bucketExecutor, _tls, _dummy, _fileHeaderContext, std::make_unique<MemoryConfigStore>(), std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo), _ddb->start(); diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index 4064fc24105..91046fbb567 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -19,6 +19,7 @@ #include <vespa/searchcore/proton/server/document_db_explorer.h> #include <vespa/searchcore/proton/server/documentdbconfigmanager.h> #include <vespa/searchcore/proton/server/memoryconfigstore.h> +#include <vespa/persistence/dummyimpl/dummy_bucket_executor.h> #include <vespa/searchcorespi/index/indexflushtarget.h> #include <vespa/searchlib/attribute/attribute_read_guard.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> @@ -69,6 +70,7 @@ struct Fixture { MyDBOwner _myDBOwner; vespalib::ThreadStackExecutor _summaryExecutor; HwInfo _hwInfo; + storage::spi::dummy::DummyBucketExecutor _bucketExecutor; DocumentDB::SP _db; DummyFileHeaderContext _fileHeaderContext; TransLogServer _tls; @@ -84,31 +86,31 @@ Fixture::Fixture() _myDBOwner(), _summaryExecutor(8, 128*1024), _hwInfo(), + _bucketExecutor(2), _db(), _fileHeaderContext(), _tls("tmp", 9014, ".", _fileHeaderContext), _queryLimiter(), _clock() { - DocumentDBConfig::DocumenttypesConfigSP documenttypesConfig(new DocumenttypesConfig()); + auto documenttypesConfig = std::make_shared<DocumenttypesConfig>(); DocumentType docType("typea", 0); - std::shared_ptr<const DocumentTypeRepo> repo(new DocumentTypeRepo(docType)); - TuneFileDocumentDB::SP tuneFileDocumentDB(new TuneFileDocumentDB); + auto repo = std::make_shared<DocumentTypeRepo>(docType); + auto tuneFileDocumentDB = std::make_shared<TuneFileDocumentDB>(); config::DirSpec spec(TEST_PATH("cfg")); DocumentDBConfigHelper mgr(spec, "typea"); - BootstrapConfig::SP - b(new BootstrapConfig(1, documenttypesConfig, repo, + auto b = std::make_shared<BootstrapConfig>(1, documenttypesConfig, repo, std::make_shared<ProtonConfig>(), std::make_shared<FiledistributorrpcConfig>(), std::make_shared<BucketspacesConfig>(), - tuneFileDocumentDB, HwInfo())); + tuneFileDocumentDB, HwInfo()); mgr.forwardConfig(b); mgr.nextGeneration(0ms); - _db.reset(new DocumentDB(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"), + _db = std::make_shared<DocumentDB>(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"), makeBucketSpace(), - *b->getProtonConfigSP(), _myDBOwner, _summaryExecutor, _summaryExecutor, _tls, _dummy, - _fileHeaderContext, ConfigStore::UP(new MemoryConfigStore), - std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo)); + *b->getProtonConfigSP(), _myDBOwner, _summaryExecutor, _summaryExecutor, _bucketExecutor, _tls, _dummy, + _fileHeaderContext, std::make_unique<MemoryConfigStore>(), + std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo); _db->start(); _db->waitForOnlineState(); } diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp index 926530c228e..af8086941fd 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp @@ -46,23 +46,27 @@ JobTestBase::init(uint32_t allowedLidBloat, { _handler = std::make_unique<MyHandler>(maxOutstandingMoveOps != MAX_OUTSTANDING_MOVE_OPS, useBucketDB()); DocumentDBLidSpaceCompactionConfig compactCfg(interval, allowedLidBloat, allowedLidBloatFactor, - REMOVE_BATCH_BLOCK_RATE, REMOVE_BLOCK_RATE, false); + REMOVE_BATCH_BLOCK_RATE, REMOVE_BLOCK_RATE, false, useBucketDB()); BlockableMaintenanceJobConfig blockableCfg(resourceLimitFactor, maxOutstandingMoveOps); if (useBucketDB()) { + _singleExecutor = std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); + _master = std::make_unique<proton::ExecutorThreadService> (*_singleExecutor); _bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4); - _job = std::make_unique<lidspace::CompactionJob>(compactCfg, *_handler, _storer, *_bucketExecutor, _diskMemUsageNotifier, - blockableCfg, _clusterStateHandler, nodeRetired, + _job = std::make_unique<lidspace::CompactionJob>(compactCfg, *_handler, _storer, *_master, *_bucketExecutor, + _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired, document::BucketSpace::placeHolder()); } else { _job = std::make_unique<LidSpaceCompactionJob>(compactCfg, *_handler, _storer, _frozenHandler, _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired); } } + void JobTestBase::sync() const { if (_bucketExecutor) { _bucketExecutor->sync(); + _master->sync(); } } diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h index 637314e2879..c81f8c8d387 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h @@ -7,6 +7,8 @@ struct JobTestBase : public ::testing::TestWithParam<bool> { std::unique_ptr<storage::spi::BucketExecutor> _bucketExecutor; + std::unique_ptr<vespalib::SyncableThreadExecutor> _singleExecutor; + std::unique_ptr<searchcorespi::index::IThreadService> _master; std::unique_ptr<MyHandler> _handler; MyStorer _storer; MyFrozenBucketHandler _frozenHandler; diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index d6fddf16659..ab447d5d38f 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -862,14 +862,10 @@ void MaintenanceControllerFixture::injectMaintenanceJobs() { if (_injectDefaultJobs) { - MaintenanceJobsInjector::injectJobs(_mc, *_mcCfg, _fh, _gsp, - _lscHandlers, _fh, _mc, _bucketCreateNotifier, _docTypeName.getName(), makeBucketSpace(), - _fh, _fh, _bmc, _clusterStateHandler, _bucketHandler, - _calc, - _diskMemUsageNotifier, - _jobTrackers, - _readyAttributeManager, - _notReadyAttributeManager, + MaintenanceJobsInjector::injectJobs(_mc, *_mcCfg, _bucketExecutor, _fh, _gsp, _lscHandlers, _fh, _mc, + _bucketCreateNotifier, _docTypeName.getName(), makeBucketSpace(), _fh, _fh, + _bmc, _clusterStateHandler, _bucketHandler, _calc, _diskMemUsageNotifier, + _jobTrackers, _readyAttributeManager, _notReadyAttributeManager, std::make_unique<const AttributeConfigInspector>(AttributesConfigBuilder()), std::make_shared<TransientMemoryUsageProvider>(), _attributeUsageFilter); diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index c12500570bc..17d0ef8ad37 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -394,6 +394,9 @@ lidspacecompaction.removebatchblockrate double default=0.5 ## It is considered again at the next regular interval (see above). lidspacecompaction.removeblockrate double default=100.0 +## Set to true to enable bucket locking via content layer +lidspacecompaction.usebucketexecutor bool default=false + ## This is the maximum value visibilitydelay you can have. ## A to higher value here will cost more memory while not improving too much. maxvisibilitydelay double default=1.0 diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp index f365b6ae02d..2f41ca83a74 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.cpp @@ -55,7 +55,8 @@ DocumentDBLidSpaceCompactionConfig::DocumentDBLidSpaceCompactionConfig() _allowedLidBloatFactor(1.0), _remove_batch_block_rate(0.5), _remove_block_rate(100), - _disabled(false) + _disabled(false), + _useBucketExecutor(false) { } @@ -64,14 +65,16 @@ DocumentDBLidSpaceCompactionConfig::DocumentDBLidSpaceCompactionConfig(vespalib: double allowedLidBloatFactor, double remove_batch_block_rate, double remove_block_rate, - bool disabled) + bool disabled, + bool useBucketExecutor) : _delay(std::min(MAX_DELAY_SEC, interval)), _interval(interval), _allowedLidBloat(allowedLidBloat), _allowedLidBloatFactor(allowedLidBloatFactor), _remove_batch_block_rate(remove_batch_block_rate), _remove_block_rate(remove_block_rate), - _disabled(disabled) + _disabled(disabled), + _useBucketExecutor(useBucketExecutor) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h index 731add1f62c..9bdb8fa0ae6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/document_db_maintenance_config.h @@ -50,6 +50,7 @@ private: double _remove_batch_block_rate; double _remove_block_rate; bool _disabled; + bool _useBucketExecutor; public: DocumentDBLidSpaceCompactionConfig(); @@ -58,7 +59,8 @@ public: double allowwedLidBloatFactor, double remove_batch_block_rate, double remove_block_rate, - bool disabled); + bool disabled, + bool useBucketExecutor); static DocumentDBLidSpaceCompactionConfig createDisabled(); bool operator==(const DocumentDBLidSpaceCompactionConfig &rhs) const; @@ -69,6 +71,7 @@ public: double get_remove_batch_block_rate() const { return _remove_batch_block_rate; } double get_remove_block_rate() const { return _remove_block_rate; } bool isDisabled() const { return _disabled; } + bool useBucketExecutor() const { return _useBucketExecutor; } }; class BlockableMaintenanceJobConfig { diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 7c4296db304..7fb16e851fa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -134,6 +134,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, IDocumentDBOwner &owner, vespalib::SyncableThreadExecutor &warmupExecutor, vespalib::ThreadStackExecutorBase &sharedExecutor, + storage::spi::BucketExecutor & bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, const FileHeaderContext &fileHeaderContext, @@ -174,6 +175,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _refCount(), _syncFeedViewEnabled(false), _owner(owner), + _bucketExecutor(bucketExecutor), _state(), _dmUsageForwarder(_writeService.master()), _writeFilter(), @@ -934,6 +936,7 @@ DocumentDB::injectMaintenanceJobs(const DocumentDBMaintenanceConfig &config, std _maintenanceController.killJobs(); MaintenanceJobsInjector::injectJobs(_maintenanceController, config, + _bucketExecutor, *_feedHandler, // IHeartBeatHandler *_sessionManager, // ISessionCachePruner _lidSpaceCompactionHandlers, diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 9ea88360b07..6ed876f197d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -45,6 +45,7 @@ namespace metrics { class UpdateHook; class MetricLockGuard; } +namespace storage::spi { struct BucketExecutor; } namespace proton { class AttributeConfigInspector; @@ -115,6 +116,7 @@ private: MonitoredRefCount _refCount; bool _syncFeedViewEnabled; IDocumentDBOwner &_owner; + storage::spi::BucketExecutor &_bucketExecutor; DDBState _state; DiskMemUsageForwarder _dmUsageForwarder; AttributeUsageFilter _writeFilter; @@ -233,6 +235,7 @@ public: IDocumentDBOwner &owner, vespalib::SyncableThreadExecutor &warmupExecutor, vespalib::ThreadStackExecutorBase &sharedExecutor, + storage::spi::BucketExecutor & bucketExecutor, const search::transactionlog::WriterFactory &tlsWriterFactory, MetricsWireService &metricsWireService, const search::common::FileHeaderContext &fileHeaderContext, diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp index c5587b33f6b..115a49fe997 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdbconfigmanager.cpp @@ -142,7 +142,8 @@ buildMaintenanceConfig(const BootstrapConfig::SP &bootstrapConfig, proton.lidspacecompaction.allowedlidbloatfactor, proton.lidspacecompaction.removebatchblockrate, proton.lidspacecompaction.removeblockrate, - isDocumentTypeGlobal), + isDocumentTypeGlobal, + proton.lidspacecompaction.usebucketexecutor), AttributeUsageFilterConfig( proton.writefilter.attribute.enumstorelimit, proton.writefilter.attribute.multivaluelimit), diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp index 51b84d5ec6e..5ccc2793d6b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -5,14 +5,17 @@ #include "i_lid_space_compaction_handler.h" #include "i_operation_storer.h" #include <vespa/searchcore/proton/feedoperation/moveoperation.h> +#include <vespa/searchcorespi/index/i_thread_service.h> #include <vespa/persistence/spi/bucket_tasks.h> #include <vespa/vespalib/util/destructor_callbacks.h> +#include <vespa/vespalib/util/lambdatask.h> #include <cassert> using search::DocumentMetaData; using search::LidUsageStats; using storage::spi::makeBucketTask; using storage::spi::Bucket; +using vespalib::makeLambdaTask; namespace proton::lidspace { @@ -44,16 +47,20 @@ CompactionJob::moveDocument(const search::DocumentMetaData & metaThen, std::shar if (metaNow.lid != metaThen.lid) return; if (metaNow.bucketId != metaThen.bucketId) return; - MoveOperation::UP op = _handler.createMoveOperation(metaNow, _handler.getLidStatus().getLowestFreeLid()); + auto op = _handler.createMoveOperation(metaNow, 0); // The real lid must be sampled in the master thread. if (!op) return; - _opStorer.appendOperation(*op, context); - _handler.handleMove(*op, std::move(context)); + _master.execute(makeLambdaTask([this, moveOp=std::move(op), onDone=std::move(context)]() { + moveOp->setTargetLid(_handler.getLidStatus().getLowestFreeLid()); + _opStorer.appendOperation(*moveOp, onDone); + _handler.handleMove(*moveOp, std::move(onDone)); + })); } CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, ILidSpaceCompactionHandler &handler, IOperationStorer &opStorer, + IThreadService & master, BucketExecutor & bucketExecutor, IDiskMemUsageNotifier &diskMemUsageNotifier, const BlockableMaintenanceJobConfig &blockableConfig, @@ -62,6 +69,7 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, document::BucketSpace bucketSpace) : LidSpaceCompactionJobBase(config, handler, opStorer, diskMemUsageNotifier, blockableConfig, clusterStateChangedNotifier, nodeRetired), + _master(master), _bucketExecutor(bucketExecutor), _bucketSpace(bucketSpace) { diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h index 96e6f0a0b7e..3a61ee85a87 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -6,6 +6,7 @@ #include <vespa/document/bucket/bucketspace.h> namespace storage::spi { struct BucketExecutor;} +namespace searchcorespi::index { class IThreadService; } namespace proton { class IDiskMemUsageNotifier; class IClusterStateChangedNotifier; @@ -22,8 +23,10 @@ class CompactionJob : public LidSpaceCompactionJobBase private: using BucketExecutor = storage::spi::BucketExecutor; using IDestructorCallback = vespalib::IDestructorCallback; - BucketExecutor &_bucketExecutor; - document::BucketSpace _bucketSpace; + using IThreadService = searchcorespi::index::IThreadService; + IThreadService & _master; + BucketExecutor &_bucketExecutor; + document::BucketSpace _bucketSpace; bool scanDocuments(const search::LidUsageStats &stats) override; void moveDocument(const search::DocumentMetaData & meta, std::shared_ptr<IDestructorCallback> onDone); @@ -32,6 +35,7 @@ public: CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, ILidSpaceCompactionHandler &handler, IOperationStorer &opStorer, + IThreadService & master, BucketExecutor & bucketExecutor, IDiskMemUsageNotifier &diskMemUsageNotifier, const BlockableMaintenanceJobConfig &blockableConfig, diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index c9ac5dd3c49..aa04cc89f52 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -4,6 +4,7 @@ #include "heart_beat_job.h" #include "job_tracked_maintenance_job.h" #include "lid_space_compaction_job.h" +#include "lid_space_compaction_job_take2.h" #include "maintenance_jobs_injector.h" #include "prune_session_cache_job.h" #include "pruneremoveddocumentsjob.h" @@ -25,6 +26,7 @@ trackJob(const IJobTracker::SP &tracker, IMaintenanceJob::UP job) void injectLidSpaceCompactionJobs(MaintenanceController &controller, const DocumentDBMaintenanceConfig &config, + storage::spi::BucketExecutor & bucketExecutor, const ILidSpaceCompactionHandler::Vector &lscHandlers, IOperationStorer &opStorer, IFrozenBucketHandler &fbHandler, @@ -34,15 +36,27 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller, const std::shared_ptr<IBucketStateCalculator> &calc, document::BucketSpace bucketSpace) { - (void) bucketSpace; for (auto &lidHandler : lscHandlers) { - IMaintenanceJob::UP job = std::make_unique<LidSpaceCompactionJob>( - config.getLidSpaceCompactionConfig(), - *lidHandler, opStorer, fbHandler, - diskMemUsageNotifier, - config.getBlockableJobConfig(), - clusterStateChangedNotifier, - (calc ? calc->nodeRetired() : false)); + + IMaintenanceJob::UP job; + if (config.getLidSpaceCompactionConfig().useBucketExecutor()) { + job = std::make_unique<lidspace::CompactionJob>( + config.getLidSpaceCompactionConfig(), + *lidHandler, opStorer, controller.masterThread(), bucketExecutor, + diskMemUsageNotifier, + config.getBlockableJobConfig(), + clusterStateChangedNotifier, + (calc ? calc->nodeRetired() : false), + bucketSpace); + } else { + job = std::make_unique<LidSpaceCompactionJob>( + config.getLidSpaceCompactionConfig(), + *lidHandler, opStorer, fbHandler, + diskMemUsageNotifier, + config.getBlockableJobConfig(), + clusterStateChangedNotifier, + (calc ? calc->nodeRetired() : false)); + } controller.registerJobInMasterThread(trackJob(tracker, std::move(job))); } } @@ -82,6 +96,7 @@ injectBucketMoveJob(MaintenanceController &controller, void MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, const DocumentDBMaintenanceConfig &config, + storage::spi::BucketExecutor & bucketExecutor, IHeartBeatHandler &hbHandler, matching::ISessionCachePruner &scPruner, const ILidSpaceCompactionHandler::Vector &lscHandlers, @@ -111,7 +126,7 @@ MaintenanceJobsInjector::injectJobs(MaintenanceController &controller, controller.registerJobInMasterThread( trackJob(jobTrackers.getRemovedDocumentsPrune(), std::move(pruneRDjob))); if (!config.getLidSpaceCompactionConfig().isDisabled()) { - injectLidSpaceCompactionJobs(controller, config, lscHandlers, opStorer, fbHandler, + injectLidSpaceCompactionJobs(controller, config, bucketExecutor, lscHandlers, opStorer, fbHandler, jobTrackers.getLidSpaceCompact(), diskMemUsageNotifier, clusterStateChangedNotifier, calc, bucketSpace); } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h index 3468ec40923..e35f9e1aa48 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.h @@ -9,6 +9,7 @@ #include <vespa/searchcore/proton/matching/isessioncachepruner.h> #include <vespa/searchcore/proton/metrics/documentdb_job_trackers.h> +namespace storage::spi {struct BucketExecutor; } namespace proton { class AttributeConfigInspector; @@ -33,6 +34,7 @@ struct MaintenanceJobsInjector using IAttributeManagerSP = std::shared_ptr<IAttributeManager>; static void injectJobs(MaintenanceController &controller, const DocumentDBMaintenanceConfig &config, + storage::spi::BucketExecutor & bucketExecutor, IHeartBeatHandler &hbHandler, matching::ISessionCachePruner &scPruner, const ILidSpaceCompactionHandler::Vector &lscHandlers, diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 7d97214244d..6706581f52c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -602,7 +602,7 @@ Proton::addDocumentDB(const document::DocumentType &docType, } auto ret = std::make_shared<DocumentDB>(config.basedir + "/documents", documentDBConfig, config.tlsspec, _queryLimiter, _clock, docTypeName, bucketSpace, config, *this, - *_warmupExecutor, *_sharedExecutor, *_tls->getTransLogServer(), + *_warmupExecutor, *_sharedExecutor, *_persistenceEngine, *_tls->getTransLogServer(), *_metricsEngine, _fileHeaderContext, std::move(config_store), initializeThreads, bootstrapConfig->getHwInfo()); try { |