diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-23 16:44:52 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-23 16:44:52 +0000 |
commit | 4e33dbc5e9ab9e8a2269bd5bd0e06e1527313782 (patch) | |
tree | cb46751988090040bcba5ca76cd85567f28ebf6c /searchcore | |
parent | fa9f44a2662e73b3ce97ea83a4ed32dd3baf680f (diff) |
Use a RetainGuard to ensure DocumentDB is not closed until everything has been drained out.
Diffstat (limited to 'searchcore')
24 files changed, 115 insertions, 74 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp index 450fa7e8318..e844047f686 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp @@ -42,6 +42,7 @@ struct ControllerFixtureBase : public ::testing::Test DummyBucketExecutor _bucketExecutor; MyMoveHandler _moveHandler; DocumentDBTaggedMetrics _metrics; + MonitoredRefCount _refCount; std::shared_ptr<BucketMoveJobV2> _bmj; MyCountJobRunner _runner; ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); @@ -123,7 +124,7 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig _bucketExecutor(4), _moveHandler(*_bucketDB, storeMoveDoneContexts), _metrics("test", 1), - _bmj(BucketMoveJobV2::create(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, + _bmj(BucketMoveJobV2::create(_calc, RetainGuard(_refCount), _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, _notReady._subDb, _bucketCreateNotifier,_clusterStateHandler, _bucketHandler, _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())), _runner(*_bmj) diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp index f592aeab9d2..8386385e7b2 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp @@ -56,7 +56,7 @@ JobTestBase::init(uint32_t allowedLidBloat, _singleExecutor = std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); _master = std::make_unique<proton::ExecutorThreadService> (*_singleExecutor); _bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4); - _job = lidspace::CompactionJob::create(compactCfg, _handler, _storer, *_master, *_bucketExecutor, + _job = lidspace::CompactionJob::create(compactCfg, RetainGuard(_refCount), _handler, _storer, *_master, *_bucketExecutor, _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired, document::BucketSpace::placeHolder()); } else { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h index 0a4e5c56acb..e74167db189 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h @@ -2,6 +2,7 @@ #include "lid_space_common.h" #include <vespa/searchcore/proton/server/blockable_maintenance_job.h> +#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/searchcorespi/index/i_thread_service.h> #include <vespa/vespalib/gtest/gtest.h> @@ -16,6 +17,7 @@ struct JobTestBase : public ::testing::TestWithParam<bool> { MyFrozenBucketHandler _frozenHandler; test::DiskMemUsageNotifier _diskMemUsageNotifier; test::ClusterStateHandler _clusterStateHandler; + MonitoredRefCount _refCount; std::shared_ptr<BlockableMaintenanceJob> _job; JobTestBase(); ~JobTestBase() override; diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 7b225571227..bebdeb2a701 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -6,6 +6,7 @@ #include <vespa/searchcore/proton/attribute/i_attribute_manager.h> #include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h> #include <vespa/searchcore/proton/common/doctypename.h> +#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/searchcore/proton/common/transient_resource_usage_provider.h> #include <vespa/searchcore/proton/documentmetastore/operation_listener.h> #include <vespa/searchcore/proton/documentmetastore/documentmetastore.h> @@ -376,6 +377,7 @@ public: AttributeUsageFilter _attributeUsageFilter; test::DiskMemUsageNotifier _diskMemUsageNotifier; BucketCreateNotifier _bucketCreateNotifier; + MonitoredRefCount _refCount; MaintenanceController _mc; MaintenanceControllerFixture(); @@ -794,7 +796,8 @@ MaintenanceControllerFixture::MaintenanceControllerFixture() _notReadyAttributeManager(std::make_shared<MyAttributeManager>()), _attributeUsageFilter(), _bucketCreateNotifier(), - _mc(_threadService, _genericExecutor, _docTypeName) + _refCount(), + _mc(_threadService, _genericExecutor, _refCount, _docTypeName) { std::vector<MyDocumentSubDB *> subDBs; subDBs.push_back(&_ready); diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp b/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp index 3cecc8308ad..6b35528318a 100644 --- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp @@ -18,14 +18,14 @@ MonitoredRefCount::~MonitoredRefCount() } void -MonitoredRefCount::retain() +MonitoredRefCount::retain() noexcept { std::lock_guard<std::mutex> guard(_lock); ++_refCount; } void -MonitoredRefCount::release() +MonitoredRefCount::release() noexcept { std::lock_guard<std::mutex> guard(_lock); --_refCount; diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h b/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h index 44413814f51..3559278c70f 100644 --- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h +++ b/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h @@ -6,6 +6,7 @@ namespace proton { +class RetainGuard; /* * Class containing a reference count that can be waited on to become zero. * Typically ancestor or member of a class that has to be careful of when @@ -16,13 +17,44 @@ class MonitoredRefCount std::mutex _lock; std::condition_variable _cv; uint32_t _refCount; - + void retain() noexcept; + void release() noexcept; + friend RetainGuard; public: MonitoredRefCount(); virtual ~MonitoredRefCount(); - void retain(); - void release(); void waitForZeroRefCount(); }; +class RetainGuard { +public: + RetainGuard(MonitoredRefCount & refCount) noexcept + : _refCount(&refCount) + { + _refCount->retain(); + } + RetainGuard(const RetainGuard & rhs) = delete; + RetainGuard & operator=(const RetainGuard & rhs) = delete; + RetainGuard(RetainGuard && rhs) noexcept + : _refCount(rhs._refCount) + { + rhs._refCount = nullptr; + } + RetainGuard & operator=(RetainGuard && rhs) noexcept { + release(); + _refCount = rhs._refCount; + rhs._refCount = nullptr; + return *this; + } + ~RetainGuard() { release(); } +private: + void release() noexcept{ + if (_refCount != nullptr) { + _refCount->release(); + _refCount = nullptr; + } + } + MonitoredRefCount * _refCount; +}; + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp index fa531385d52..5ffd474834d 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp @@ -125,11 +125,9 @@ DocumentDBReferenceResolver::listenToGidToLidChanges(const IAttributeManager &at auto &attr = *attrSP; vespalib::string docTypeName = getTargetDocTypeName(attr.getName(), _thisDocType); GidToLidChangeRegistrator ®istrator = getRegistrator(docTypeName); - auto listener = std::make_unique<GidToLidChangeListener>(_attributeFieldWriter, - attrSP, - _refCount, - attr.getName(), - _thisDocType.getName()); + auto listener = std::make_unique<GidToLidChangeListener>(_attributeFieldWriter, attrSP, + RetainGuard(_refCount), + attr.getName(), _thisDocType.getName()); registrator.addListener(std::move(listener)); } } @@ -159,7 +157,6 @@ DocumentDBReferenceResolver::DocumentDBReferenceResolver(const IDocumentDBRefere const DocumentType &thisDocType, const ImportedFieldsConfig &importedFieldsCfg, const document::DocumentType &prevThisDocType, - MonitoredRefCount &refCount, ISequencedTaskExecutor &attributeFieldWriter, bool useReferences) diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp index c54346defd5..cc26aad5f5b 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp @@ -7,22 +7,20 @@ namespace proton { GidToLidChangeListener::GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter, std::shared_ptr<search::attribute::ReferenceAttribute> attr, - MonitoredRefCount &refCount, + RetainGuard retainGuard, const vespalib::string &name, const vespalib::string &docTypeName) : _attributeFieldWriter(attributeFieldWriter), _executorId(_attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix())), _attr(std::move(attr)), - _refCount(refCount), + _retainGuard(std::move(retainGuard)), _name(name), _docTypeName(docTypeName) -{ - _refCount.retain(); -} +{ } + GidToLidChangeListener::~GidToLidChangeListener() { _attributeFieldWriter.sync(); - _refCount.release(); } void diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h index ae53e674c24..0a1026319a3 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h @@ -18,14 +18,14 @@ class GidToLidChangeListener : public IGidToLidChangeListener vespalib::ISequencedTaskExecutor &_attributeFieldWriter; vespalib::ISequencedTaskExecutor::ExecutorId _executorId; std::shared_ptr<search::attribute::ReferenceAttribute> _attr; - MonitoredRefCount &_refCount; + RetainGuard _retainGuard; vespalib::string _name; vespalib::string _docTypeName; public: GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter, std::shared_ptr<search::attribute::ReferenceAttribute> attr, - MonitoredRefCount &refCount, + RetainGuard refCount, const vespalib::string &name, const vespalib::string &docTypeName); ~GidToLidChangeListener() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index 6cd925d1b3d..7d26644944a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -54,6 +54,7 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc) } BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, + RetainGuard dbRetainer, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &modifiedHandler, IThreadService & master, @@ -74,6 +75,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> & IDiskMemUsageListener(), std::enable_shared_from_this<BucketMoveJobV2>(), _calc(calc), + _dbRetainer(std::move(dbRetainer)), _moveHandler(moveHandler), _modifiedHandler(modifiedHandler), _master(master), diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h index 736ac3a8f40..0f873c8b290 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -10,6 +10,8 @@ #include "maintenancedocumentsubdb.h" #include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h> #include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> +#include <vespa/searchcore/proton/common/monitored_refcount.h> + namespace storage::spi { struct BucketExecutor; } namespace searchcorespi::index { struct IThreadService; } @@ -57,6 +59,7 @@ private: using Movers = std::vector<BucketMoverSP>; using GuardedMoveOps = BucketMover::GuardedMoveOps; std::shared_ptr<IBucketStateCalculator> _calc; + RetainGuard _dbRetainer; IDocumentMoveHandler &_moveHandler; IBucketModifiedHandler &_modifiedHandler; IThreadService &_master; @@ -78,6 +81,7 @@ private: IDiskMemUsageNotifier &_diskMemUsageNotifier; BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, + RetainGuard dbRetainer, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &modifiedHandler, IThreadService & master, @@ -112,6 +116,7 @@ private: public: static std::shared_ptr<BucketMoveJobV2> create(const std::shared_ptr<IBucketStateCalculator> &calc, + RetainGuard dbRetainer, IDocumentMoveHandler &moveHandler, IBucketModifiedHandler &modifiedHandler, IThreadService & master, @@ -127,7 +132,7 @@ public: document::BucketSpace bucketSpace) { return std::shared_ptr<BucketMoveJobV2>( - new BucketMoveJobV2(calc, moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady, + new BucketMoveJobV2(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady, bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace)); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 549981330ef..3f3fad55bd6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -198,7 +198,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, metricsWireService, getMetrics(), queryLimiter, clock, _configMutex, _baseDir, DocumentSubDBCollection::Config(protonCfg.numsearcherthreads), hwInfo), - _maintenanceController(_writeService.master(), sharedExecutor, _docTypeName), + _maintenanceController(_writeService.master(), sharedExecutor, _refCount, _docTypeName), _jobTrackers(), _calc(), _metricsUpdater(_subDBs, _writeService, _jobTrackers, *_sessionManager, _writeFilter) @@ -408,8 +408,8 @@ DocumentDB::applySubDBConfig(const DocumentDBConfig &newConfigSnapshot, auto newRepo = newConfigSnapshot.getDocumentTypeRepoSP(); auto newDocType = newRepo->getDocumentType(_docTypeName.getName()); assert(newDocType != nullptr); - DocumentDBReferenceResolver resolver(*registry, *newDocType, newConfigSnapshot.getImportedFieldsConfig(), - *oldDocType, _refCount, _writeService.attributeFieldWriter(), _state.getAllowReconfig()); + DocumentDBReferenceResolver resolver(*registry, *newDocType, newConfigSnapshot.getImportedFieldsConfig(), *oldDocType, + _refCount, _writeService.attributeFieldWriter(), _state.getAllowReconfig()); _subDBs.applyConfig(newConfigSnapshot, *_activeConfigSnapshot, serialNum, params, resolver); } @@ -551,13 +551,8 @@ DocumentDB::tearDownReferences() auto repo = activeConfig->getDocumentTypeRepoSP(); auto docType = repo->getDocumentType(_docTypeName.getName()); assert(docType != nullptr); - DocumentDBReferenceResolver resolver(*registry, - *docType, - activeConfig->getImportedFieldsConfig(), - *docType, - _refCount, - _writeService.attributeFieldWriter(), - false); + DocumentDBReferenceResolver resolver(*registry, *docType, activeConfig->getImportedFieldsConfig(), *docType, + _refCount, _writeService.attributeFieldWriter(), false); _subDBs.tearDownReferences(resolver); registry->remove(_docTypeName.getName()); } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index b1ba818e03f..ad0225d6f86 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -385,8 +385,7 @@ public: /** * Reference counting */ - void retain() { _refCount.retain(); } - void release() { _refCount.release(); } + RetainGuard retain() { return RetainGuard(_refCount); } bool getDelayedConfig() const { return _state.getDelayedConfig(); } void replayConfig(SerialNum serialNum) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp index 0178dec86e4..e205700143b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp @@ -9,16 +9,12 @@ namespace proton { FlushHandlerProxy::FlushHandlerProxy(const DocumentDB::SP &documentDB) : IFlushHandler(documentDB->getDocTypeName().toString()), - _documentDB(documentDB) -{ - _documentDB->retain(); -} + _documentDB(documentDB), + _retainGuard(_documentDB->retain()) +{ } -FlushHandlerProxy::~FlushHandlerProxy() -{ - _documentDB->release(); -} +FlushHandlerProxy::~FlushHandlerProxy() = default; std::vector<IFlushTarget::SP> diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h index ededfe977e3..530a3997d84 100644 --- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h @@ -3,6 +3,7 @@ #pragma once #include <vespa/searchcore/proton/flushengine/iflushhandler.h> +#include <vespa/searchcore/proton/common/monitored_refcount.h> namespace proton { @@ -12,10 +13,11 @@ class FlushHandlerProxy : public IFlushHandler { private: std::shared_ptr<DocumentDB> _documentDB; + RetainGuard _retainGuard; public: FlushHandlerProxy(const std::shared_ptr<DocumentDB> &documentDB); - virtual ~FlushHandlerProxy(); + ~FlushHandlerProxy() override; /** * Implements IFlushHandler. diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp index 8d3f32729a4..822d190feec 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -114,6 +114,7 @@ CompactionJob::completeMove(const search::DocumentMetaData & metaThen, std::uniq } CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, + RetainGuard dbRetainer, std::shared_ptr<ILidSpaceCompactionHandler> handler, IOperationStorer &opStorer, IThreadService & master, @@ -128,6 +129,7 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, std::enable_shared_from_this<CompactionJob>(), _master(master), _bucketExecutor(bucketExecutor), + _dbRetainer(std::move(dbRetainer)), _bucketSpace(bucketSpace), _stopped(false) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h index 12c0e30c61e..02c9ccb8bcf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -3,6 +3,7 @@ #pragma once #include "lid_space_compaction_job_base.h" +#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/document/bucket/bucketspace.h> #include <atomic> @@ -27,8 +28,9 @@ private: using BucketExecutor = storage::spi::BucketExecutor; using IDestructorCallback = vespalib::IDestructorCallback; using IThreadService = searchcorespi::index::IThreadService; - IThreadService & _master; + IThreadService &_master; BucketExecutor &_bucketExecutor; + RetainGuard _dbRetainer; document::BucketSpace _bucketSpace; std::atomic<bool> _stopped; @@ -41,6 +43,7 @@ private: class MoveTask; CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, + RetainGuard dbRetainer, std::shared_ptr<ILidSpaceCompactionHandler> handler, IOperationStorer &opStorer, IThreadService & master, @@ -53,6 +56,7 @@ private: public: static std::shared_ptr<CompactionJob> create(const DocumentDBLidSpaceCompactionConfig &config, + RetainGuard dbRetainer, std::shared_ptr<ILidSpaceCompactionHandler> handler, IOperationStorer &opStorer, IThreadService & master, @@ -64,8 +68,8 @@ public: document::BucketSpace bucketSpace) { return std::shared_ptr<CompactionJob>( - new CompactionJob(config, std::move(handler), opStorer, master, bucketExecutor, diskMemUsageNotifier, - blockableConfig, clusterStateChangedNotifier, nodeRetired, bucketSpace)); + new CompactionJob(config, std::move(dbRetainer), std::move(handler), opStorer, master, bucketExecutor, + diskMemUsageNotifier, blockableConfig, clusterStateChangedNotifier, nodeRetired, bucketSpace)); } ~CompactionJob() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index 5b7824e4cc1..7a5a42b5608 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -41,10 +41,10 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller, for (auto &lidHandler : lscHandlers) { std::shared_ptr<IMaintenanceJob> job; if (config.getLidSpaceCompactionConfig().useBucketExecutor()) { - job = lidspace::CompactionJob::create(config.getLidSpaceCompactionConfig(), std::move(lidHandler), opStorer, - controller.masterThread(), bucketExecutor, diskMemUsageNotifier, - config.getBlockableJobConfig(), clusterStateChangedNotifier, - (calc ? calc->nodeRetired() : false), bucketSpace); + job = lidspace::CompactionJob::create(config.getLidSpaceCompactionConfig(), controller.retainDB(), + std::move(lidHandler), opStorer, controller.masterThread(), + bucketExecutor, diskMemUsageNotifier,config.getBlockableJobConfig(), + clusterStateChangedNotifier, (calc ? calc->nodeRetired() : false), bucketSpace); } else { job = std::make_shared<LidSpaceCompactionJob>( config.getLidSpaceCompactionConfig(), @@ -76,7 +76,7 @@ injectBucketMoveJob(MaintenanceController &controller, { std::shared_ptr<IMaintenanceJob> bmj; if (config.getBucketMoveConfig().useBucketExecutor()) { - bmj = BucketMoveJobV2::create(calc, moveHandler, bucketModifiedHandler, controller.masterThread(), + bmj = BucketMoveJobV2::create(calc, controller.retainDB(), moveHandler, bucketModifiedHandler, controller.masterThread(), bucketExecutor, controller.getReadySubDB(), controller.getNotReadySubDB(), bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, diskMemUsageNotifier, config.getBlockableJobConfig(), docTypeName, bucketSpace); diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index f981ebb1116..3b4526e6f7c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -40,10 +40,12 @@ isRunningOrRunnable(const MaintenanceJobRunner & job, const Executor * master) { MaintenanceController::MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, + MonitoredRefCount & refCount, const DocTypeName &docTypeName) : IBucketFreezeListener(), _masterThread(masterThread), _defaultExecutor(defaultExecutor), + _refCount(refCount), _readySubDB(), _remSubDB(), _notReadySubDB(), diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 0e080cf6445..6415c51eeed 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -7,8 +7,10 @@ #include "frozenbuckets.h" #include "ibucketfreezelistener.h" #include <vespa/searchcore/proton/common/doctypename.h> -#include <mutex> +#include <vespa/searchcore/proton/common/monitored_refcount.h> #include <vespa/vespalib/util/scheduledexecutor.h> +#include <mutex> + namespace vespalib { class Timer; @@ -20,6 +22,7 @@ namespace proton { class MaintenanceJobRunner; class DocumentDBMaintenanceConfig; +class MonitoredRefCount; /** * Class that controls the bucket moving between ready and notready sub databases @@ -35,7 +38,7 @@ public: using UP = std::unique_ptr<MaintenanceController>; enum class State {INITIALIZING, STARTED, PAUSED, STOPPING}; - MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, const DocTypeName &docTypeName); + MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, MonitoredRefCount & refCount, const DocTypeName &docTypeName); ~MaintenanceController() override; void registerJobInMasterThread(IMaintenanceJob::UP job); @@ -73,12 +76,14 @@ public: const MaintenanceDocumentSubDB & getNotReadySubDB() const { return _notReadySubDB; } IThreadService & masterThread() { return _masterThread; } const DocTypeName & getDocTypeName() const { return _docTypeName; } + RetainGuard retainDB() { return RetainGuard(_refCount); } private: using Mutex = std::mutex; using Guard = std::lock_guard<Mutex>; IThreadService &_masterThread; vespalib::Executor &_defaultExecutor; + MonitoredRefCount &_refCount; MaintenanceDocumentSubDB _readySubDB; MaintenanceDocumentSubDB _remSubDB; MaintenanceDocumentSubDB _notReadySubDB; diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp index c2f80f78b8a..d298c0fac24 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp @@ -21,15 +21,11 @@ PersistenceHandlerProxy::PersistenceHandlerProxy(DocumentDB::SP documentDB) : _documentDB(std::move(documentDB)), _feedHandler(_documentDB->getFeedHandler()), _bucketHandler(_documentDB->getBucketHandler()), - _clusterStateHandler(_documentDB->getClusterStateHandler()) -{ - _documentDB->retain(); -} + _clusterStateHandler(_documentDB->getClusterStateHandler()), + _retainGuard(_documentDB->retain()) +{ } -PersistenceHandlerProxy::~PersistenceHandlerProxy() -{ - _documentDB->release(); -} +PersistenceHandlerProxy::~PersistenceHandlerProxy() = default; void PersistenceHandlerProxy::initialize() diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h index ee6e8a7ee42..ce95cc3bddd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h @@ -3,6 +3,7 @@ #pragma once #include <vespa/searchcore/proton/persistenceengine/ipersistencehandler.h> +#include <vespa/searchcore/proton/common/monitored_refcount.h> namespace proton { @@ -13,10 +14,11 @@ class ClusterStateHandler; class PersistenceHandlerProxy : public IPersistenceHandler { private: - std::shared_ptr<DocumentDB> _documentDB; - FeedHandler &_feedHandler; - BucketHandler &_bucketHandler; - ClusterStateHandler &_clusterStateHandler; + std::shared_ptr<DocumentDB> _documentDB; + FeedHandler &_feedHandler; + BucketHandler &_bucketHandler; + ClusterStateHandler &_clusterStateHandler; + RetainGuard _retainGuard; public: explicit PersistenceHandlerProxy(std::shared_ptr<DocumentDB> documentDB); diff --git a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp index 6da6c09cdba..2c1133ad1d6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp @@ -8,15 +8,11 @@ namespace proton { SearchHandlerProxy::SearchHandlerProxy(DocumentDB::SP documentDB) - : _documentDB(std::move(documentDB)) -{ - _documentDB->retain(); -} + : _documentDB(std::move(documentDB)), + _retainGuard(_documentDB->retain()) +{ } -SearchHandlerProxy::~SearchHandlerProxy() -{ - _documentDB->release(); -} +SearchHandlerProxy::~SearchHandlerProxy() = default; std::unique_ptr<search::engine::DocsumReply> SearchHandlerProxy::getDocsums(const DocsumRequest & request) diff --git a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h index fc4f517fb36..d6690fda47e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h @@ -3,6 +3,7 @@ #pragma once #include <vespa/searchcore/proton/summaryengine/isearchhandler.h> +#include <vespa/searchcore/proton/common/monitored_refcount.h> namespace proton { @@ -12,6 +13,7 @@ class SearchHandlerProxy : public ISearchHandler { private: std::shared_ptr<DocumentDB> _documentDB; + RetainGuard _retainGuard; public: SearchHandlerProxy(std::shared_ptr<DocumentDB> documentDB); |