aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-04-23 16:44:52 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-04-23 16:44:52 +0000
commit4e33dbc5e9ab9e8a2269bd5bd0e06e1527313782 (patch)
treecb46751988090040bcba5ca76cd85567f28ebf6c
parentfa9f44a2662e73b3ce97ea83a4ed32dd3baf680f (diff)
Use a RetainGuard to ensure DocumentDB is not closed until everything has been drained out.
-rw-r--r--searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp3
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h2
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h38
-rw-r--r--searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp15
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.h3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h2
24 files changed, 115 insertions, 74 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
index 450fa7e8318..e844047f686 100644
--- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_v2_test.cpp
@@ -42,6 +42,7 @@ struct ControllerFixtureBase : public ::testing::Test
DummyBucketExecutor _bucketExecutor;
MyMoveHandler _moveHandler;
DocumentDBTaggedMetrics _metrics;
+ MonitoredRefCount _refCount;
std::shared_ptr<BucketMoveJobV2> _bmj;
MyCountJobRunner _runner;
ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts);
@@ -123,7 +124,7 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig
_bucketExecutor(4),
_moveHandler(*_bucketDB, storeMoveDoneContexts),
_metrics("test", 1),
- _bmj(BucketMoveJobV2::create(_calc, _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb,
+ _bmj(BucketMoveJobV2::create(_calc, RetainGuard(_refCount), _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb,
_notReady._subDb, _bucketCreateNotifier,_clusterStateHandler, _bucketHandler,
_diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())),
_runner(*_bmj)
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
index f592aeab9d2..8386385e7b2 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp
@@ -56,7 +56,7 @@ JobTestBase::init(uint32_t allowedLidBloat,
_singleExecutor = std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
_master = std::make_unique<proton::ExecutorThreadService> (*_singleExecutor);
_bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4);
- _job = lidspace::CompactionJob::create(compactCfg, _handler, _storer, *_master, *_bucketExecutor,
+ _job = lidspace::CompactionJob::create(compactCfg, RetainGuard(_refCount), _handler, _storer, *_master, *_bucketExecutor,
_diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired,
document::BucketSpace::placeHolder());
} else {
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
index 0a4e5c56acb..e74167db189 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h
@@ -2,6 +2,7 @@
#include "lid_space_common.h"
#include <vespa/searchcore/proton/server/blockable_maintenance_job.h>
+#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/searchcorespi/index/i_thread_service.h>
#include <vespa/vespalib/gtest/gtest.h>
@@ -16,6 +17,7 @@ struct JobTestBase : public ::testing::TestWithParam<bool> {
MyFrozenBucketHandler _frozenHandler;
test::DiskMemUsageNotifier _diskMemUsageNotifier;
test::ClusterStateHandler _clusterStateHandler;
+ MonitoredRefCount _refCount;
std::shared_ptr<BlockableMaintenanceJob> _job;
JobTestBase();
~JobTestBase() override;
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index 7b225571227..bebdeb2a701 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -6,6 +6,7 @@
#include <vespa/searchcore/proton/attribute/i_attribute_manager.h>
#include <vespa/searchcore/proton/bucketdb/bucket_create_notifier.h>
#include <vespa/searchcore/proton/common/doctypename.h>
+#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/searchcore/proton/common/transient_resource_usage_provider.h>
#include <vespa/searchcore/proton/documentmetastore/operation_listener.h>
#include <vespa/searchcore/proton/documentmetastore/documentmetastore.h>
@@ -376,6 +377,7 @@ public:
AttributeUsageFilter _attributeUsageFilter;
test::DiskMemUsageNotifier _diskMemUsageNotifier;
BucketCreateNotifier _bucketCreateNotifier;
+ MonitoredRefCount _refCount;
MaintenanceController _mc;
MaintenanceControllerFixture();
@@ -794,7 +796,8 @@ MaintenanceControllerFixture::MaintenanceControllerFixture()
_notReadyAttributeManager(std::make_shared<MyAttributeManager>()),
_attributeUsageFilter(),
_bucketCreateNotifier(),
- _mc(_threadService, _genericExecutor, _docTypeName)
+ _refCount(),
+ _mc(_threadService, _genericExecutor, _refCount, _docTypeName)
{
std::vector<MyDocumentSubDB *> subDBs;
subDBs.push_back(&_ready);
diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp b/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp
index 3cecc8308ad..6b35528318a 100644
--- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.cpp
@@ -18,14 +18,14 @@ MonitoredRefCount::~MonitoredRefCount()
}
void
-MonitoredRefCount::retain()
+MonitoredRefCount::retain() noexcept
{
std::lock_guard<std::mutex> guard(_lock);
++_refCount;
}
void
-MonitoredRefCount::release()
+MonitoredRefCount::release() noexcept
{
std::lock_guard<std::mutex> guard(_lock);
--_refCount;
diff --git a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h b/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h
index 44413814f51..3559278c70f 100644
--- a/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h
+++ b/searchcore/src/vespa/searchcore/proton/common/monitored_refcount.h
@@ -6,6 +6,7 @@
namespace proton {
+class RetainGuard;
/*
* Class containing a reference count that can be waited on to become zero.
* Typically ancestor or member of a class that has to be careful of when
@@ -16,13 +17,44 @@ class MonitoredRefCount
std::mutex _lock;
std::condition_variable _cv;
uint32_t _refCount;
-
+ void retain() noexcept;
+ void release() noexcept;
+ friend RetainGuard;
public:
MonitoredRefCount();
virtual ~MonitoredRefCount();
- void retain();
- void release();
void waitForZeroRefCount();
};
+class RetainGuard {
+public:
+ RetainGuard(MonitoredRefCount & refCount) noexcept
+ : _refCount(&refCount)
+ {
+ _refCount->retain();
+ }
+ RetainGuard(const RetainGuard & rhs) = delete;
+ RetainGuard & operator=(const RetainGuard & rhs) = delete;
+ RetainGuard(RetainGuard && rhs) noexcept
+ : _refCount(rhs._refCount)
+ {
+ rhs._refCount = nullptr;
+ }
+ RetainGuard & operator=(RetainGuard && rhs) noexcept {
+ release();
+ _refCount = rhs._refCount;
+ rhs._refCount = nullptr;
+ return *this;
+ }
+ ~RetainGuard() { release(); }
+private:
+ void release() noexcept{
+ if (_refCount != nullptr) {
+ _refCount->release();
+ _refCount = nullptr;
+ }
+ }
+ MonitoredRefCount * _refCount;
+};
+
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp
index fa531385d52..5ffd474834d 100644
--- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp
+++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_resolver.cpp
@@ -125,11 +125,9 @@ DocumentDBReferenceResolver::listenToGidToLidChanges(const IAttributeManager &at
auto &attr = *attrSP;
vespalib::string docTypeName = getTargetDocTypeName(attr.getName(), _thisDocType);
GidToLidChangeRegistrator &registrator = getRegistrator(docTypeName);
- auto listener = std::make_unique<GidToLidChangeListener>(_attributeFieldWriter,
- attrSP,
- _refCount,
- attr.getName(),
- _thisDocType.getName());
+ auto listener = std::make_unique<GidToLidChangeListener>(_attributeFieldWriter, attrSP,
+ RetainGuard(_refCount),
+ attr.getName(), _thisDocType.getName());
registrator.addListener(std::move(listener));
}
}
@@ -159,7 +157,6 @@ DocumentDBReferenceResolver::DocumentDBReferenceResolver(const IDocumentDBRefere
const DocumentType &thisDocType,
const ImportedFieldsConfig &importedFieldsCfg,
const document::DocumentType &prevThisDocType,
-
MonitoredRefCount &refCount,
ISequencedTaskExecutor &attributeFieldWriter,
bool useReferences)
diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp
index c54346defd5..cc26aad5f5b 100644
--- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp
+++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp
@@ -7,22 +7,20 @@ namespace proton {
GidToLidChangeListener::GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter,
std::shared_ptr<search::attribute::ReferenceAttribute> attr,
- MonitoredRefCount &refCount,
+ RetainGuard retainGuard,
const vespalib::string &name,
const vespalib::string &docTypeName)
: _attributeFieldWriter(attributeFieldWriter),
_executorId(_attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix())),
_attr(std::move(attr)),
- _refCount(refCount),
+ _retainGuard(std::move(retainGuard)),
_name(name),
_docTypeName(docTypeName)
-{
- _refCount.retain();
-}
+{ }
+
GidToLidChangeListener::~GidToLidChangeListener()
{
_attributeFieldWriter.sync();
- _refCount.release();
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h
index ae53e674c24..0a1026319a3 100644
--- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h
+++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.h
@@ -18,14 +18,14 @@ class GidToLidChangeListener : public IGidToLidChangeListener
vespalib::ISequencedTaskExecutor &_attributeFieldWriter;
vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
std::shared_ptr<search::attribute::ReferenceAttribute> _attr;
- MonitoredRefCount &_refCount;
+ RetainGuard _retainGuard;
vespalib::string _name;
vespalib::string _docTypeName;
public:
GidToLidChangeListener(vespalib::ISequencedTaskExecutor &attributeFieldWriter,
std::shared_ptr<search::attribute::ReferenceAttribute> attr,
- MonitoredRefCount &refCount,
+ RetainGuard refCount,
const vespalib::string &name,
const vespalib::string &docTypeName);
~GidToLidChangeListener() override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
index 6cd925d1b3d..7d26644944a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp
@@ -54,6 +54,7 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc)
}
BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
+ RetainGuard dbRetainer,
IDocumentMoveHandler &moveHandler,
IBucketModifiedHandler &modifiedHandler,
IThreadService & master,
@@ -74,6 +75,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &
IDiskMemUsageListener(),
std::enable_shared_from_this<BucketMoveJobV2>(),
_calc(calc),
+ _dbRetainer(std::move(dbRetainer)),
_moveHandler(moveHandler),
_modifiedHandler(modifiedHandler),
_master(master),
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
index 736ac3a8f40..0f873c8b290 100644
--- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h
@@ -10,6 +10,8 @@
#include "maintenancedocumentsubdb.h"
#include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h>
#include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h>
+#include <vespa/searchcore/proton/common/monitored_refcount.h>
+
namespace storage::spi { struct BucketExecutor; }
namespace searchcorespi::index { struct IThreadService; }
@@ -57,6 +59,7 @@ private:
using Movers = std::vector<BucketMoverSP>;
using GuardedMoveOps = BucketMover::GuardedMoveOps;
std::shared_ptr<IBucketStateCalculator> _calc;
+ RetainGuard _dbRetainer;
IDocumentMoveHandler &_moveHandler;
IBucketModifiedHandler &_modifiedHandler;
IThreadService &_master;
@@ -78,6 +81,7 @@ private:
IDiskMemUsageNotifier &_diskMemUsageNotifier;
BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc,
+ RetainGuard dbRetainer,
IDocumentMoveHandler &moveHandler,
IBucketModifiedHandler &modifiedHandler,
IThreadService & master,
@@ -112,6 +116,7 @@ private:
public:
static std::shared_ptr<BucketMoveJobV2>
create(const std::shared_ptr<IBucketStateCalculator> &calc,
+ RetainGuard dbRetainer,
IDocumentMoveHandler &moveHandler,
IBucketModifiedHandler &modifiedHandler,
IThreadService & master,
@@ -127,7 +132,7 @@ public:
document::BucketSpace bucketSpace)
{
return std::shared_ptr<BucketMoveJobV2>(
- new BucketMoveJobV2(calc, moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady,
+ new BucketMoveJobV2(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady,
bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier,
diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 549981330ef..3f3fad55bd6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -198,7 +198,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
metricsWireService, getMetrics(), queryLimiter, clock, _configMutex, _baseDir,
DocumentSubDBCollection::Config(protonCfg.numsearcherthreads),
hwInfo),
- _maintenanceController(_writeService.master(), sharedExecutor, _docTypeName),
+ _maintenanceController(_writeService.master(), sharedExecutor, _refCount, _docTypeName),
_jobTrackers(),
_calc(),
_metricsUpdater(_subDBs, _writeService, _jobTrackers, *_sessionManager, _writeFilter)
@@ -408,8 +408,8 @@ DocumentDB::applySubDBConfig(const DocumentDBConfig &newConfigSnapshot,
auto newRepo = newConfigSnapshot.getDocumentTypeRepoSP();
auto newDocType = newRepo->getDocumentType(_docTypeName.getName());
assert(newDocType != nullptr);
- DocumentDBReferenceResolver resolver(*registry, *newDocType, newConfigSnapshot.getImportedFieldsConfig(),
- *oldDocType, _refCount, _writeService.attributeFieldWriter(), _state.getAllowReconfig());
+ DocumentDBReferenceResolver resolver(*registry, *newDocType, newConfigSnapshot.getImportedFieldsConfig(), *oldDocType,
+ _refCount, _writeService.attributeFieldWriter(), _state.getAllowReconfig());
_subDBs.applyConfig(newConfigSnapshot, *_activeConfigSnapshot, serialNum, params, resolver);
}
@@ -551,13 +551,8 @@ DocumentDB::tearDownReferences()
auto repo = activeConfig->getDocumentTypeRepoSP();
auto docType = repo->getDocumentType(_docTypeName.getName());
assert(docType != nullptr);
- DocumentDBReferenceResolver resolver(*registry,
- *docType,
- activeConfig->getImportedFieldsConfig(),
- *docType,
- _refCount,
- _writeService.attributeFieldWriter(),
- false);
+ DocumentDBReferenceResolver resolver(*registry, *docType, activeConfig->getImportedFieldsConfig(), *docType,
+ _refCount, _writeService.attributeFieldWriter(), false);
_subDBs.tearDownReferences(resolver);
registry->remove(_docTypeName.getName());
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
index b1ba818e03f..ad0225d6f86 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h
@@ -385,8 +385,7 @@ public:
/**
* Reference counting
*/
- void retain() { _refCount.retain(); }
- void release() { _refCount.release(); }
+ RetainGuard retain() { return RetainGuard(_refCount); }
bool getDelayedConfig() const { return _state.getDelayedConfig(); }
void replayConfig(SerialNum serialNum) override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp
index 0178dec86e4..e205700143b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.cpp
@@ -9,16 +9,12 @@ namespace proton {
FlushHandlerProxy::FlushHandlerProxy(const DocumentDB::SP &documentDB)
: IFlushHandler(documentDB->getDocTypeName().toString()),
- _documentDB(documentDB)
-{
- _documentDB->retain();
-}
+ _documentDB(documentDB),
+ _retainGuard(_documentDB->retain())
+{ }
-FlushHandlerProxy::~FlushHandlerProxy()
-{
- _documentDB->release();
-}
+FlushHandlerProxy::~FlushHandlerProxy() = default;
std::vector<IFlushTarget::SP>
diff --git a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
index ededfe977e3..530a3997d84 100644
--- a/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/flushhandlerproxy.h
@@ -3,6 +3,7 @@
#pragma once
#include <vespa/searchcore/proton/flushengine/iflushhandler.h>
+#include <vespa/searchcore/proton/common/monitored_refcount.h>
namespace proton {
@@ -12,10 +13,11 @@ class FlushHandlerProxy : public IFlushHandler
{
private:
std::shared_ptr<DocumentDB> _documentDB;
+ RetainGuard _retainGuard;
public:
FlushHandlerProxy(const std::shared_ptr<DocumentDB> &documentDB);
- virtual ~FlushHandlerProxy();
+ ~FlushHandlerProxy() override;
/**
* Implements IFlushHandler.
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
index 8d3f32729a4..822d190feec 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp
@@ -114,6 +114,7 @@ CompactionJob::completeMove(const search::DocumentMetaData & metaThen, std::uniq
}
CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
+ RetainGuard dbRetainer,
std::shared_ptr<ILidSpaceCompactionHandler> handler,
IOperationStorer &opStorer,
IThreadService & master,
@@ -128,6 +129,7 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
std::enable_shared_from_this<CompactionJob>(),
_master(master),
_bucketExecutor(bucketExecutor),
+ _dbRetainer(std::move(dbRetainer)),
_bucketSpace(bucketSpace),
_stopped(false)
{ }
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
index 12c0e30c61e..02c9ccb8bcf 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h
@@ -3,6 +3,7 @@
#pragma once
#include "lid_space_compaction_job_base.h"
+#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/document/bucket/bucketspace.h>
#include <atomic>
@@ -27,8 +28,9 @@ private:
using BucketExecutor = storage::spi::BucketExecutor;
using IDestructorCallback = vespalib::IDestructorCallback;
using IThreadService = searchcorespi::index::IThreadService;
- IThreadService & _master;
+ IThreadService &_master;
BucketExecutor &_bucketExecutor;
+ RetainGuard _dbRetainer;
document::BucketSpace _bucketSpace;
std::atomic<bool> _stopped;
@@ -41,6 +43,7 @@ private:
class MoveTask;
CompactionJob(const DocumentDBLidSpaceCompactionConfig &config,
+ RetainGuard dbRetainer,
std::shared_ptr<ILidSpaceCompactionHandler> handler,
IOperationStorer &opStorer,
IThreadService & master,
@@ -53,6 +56,7 @@ private:
public:
static std::shared_ptr<CompactionJob>
create(const DocumentDBLidSpaceCompactionConfig &config,
+ RetainGuard dbRetainer,
std::shared_ptr<ILidSpaceCompactionHandler> handler,
IOperationStorer &opStorer,
IThreadService & master,
@@ -64,8 +68,8 @@ public:
document::BucketSpace bucketSpace)
{
return std::shared_ptr<CompactionJob>(
- new CompactionJob(config, std::move(handler), opStorer, master, bucketExecutor, diskMemUsageNotifier,
- blockableConfig, clusterStateChangedNotifier, nodeRetired, bucketSpace));
+ new CompactionJob(config, std::move(dbRetainer), std::move(handler), opStorer, master, bucketExecutor,
+ diskMemUsageNotifier, blockableConfig, clusterStateChangedNotifier, nodeRetired, bucketSpace));
}
~CompactionJob() override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
index 5b7824e4cc1..7a5a42b5608 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp
@@ -41,10 +41,10 @@ injectLidSpaceCompactionJobs(MaintenanceController &controller,
for (auto &lidHandler : lscHandlers) {
std::shared_ptr<IMaintenanceJob> job;
if (config.getLidSpaceCompactionConfig().useBucketExecutor()) {
- job = lidspace::CompactionJob::create(config.getLidSpaceCompactionConfig(), std::move(lidHandler), opStorer,
- controller.masterThread(), bucketExecutor, diskMemUsageNotifier,
- config.getBlockableJobConfig(), clusterStateChangedNotifier,
- (calc ? calc->nodeRetired() : false), bucketSpace);
+ job = lidspace::CompactionJob::create(config.getLidSpaceCompactionConfig(), controller.retainDB(),
+ std::move(lidHandler), opStorer, controller.masterThread(),
+ bucketExecutor, diskMemUsageNotifier,config.getBlockableJobConfig(),
+ clusterStateChangedNotifier, (calc ? calc->nodeRetired() : false), bucketSpace);
} else {
job = std::make_shared<LidSpaceCompactionJob>(
config.getLidSpaceCompactionConfig(),
@@ -76,7 +76,7 @@ injectBucketMoveJob(MaintenanceController &controller,
{
std::shared_ptr<IMaintenanceJob> bmj;
if (config.getBucketMoveConfig().useBucketExecutor()) {
- bmj = BucketMoveJobV2::create(calc, moveHandler, bucketModifiedHandler, controller.masterThread(),
+ bmj = BucketMoveJobV2::create(calc, controller.retainDB(), moveHandler, bucketModifiedHandler, controller.masterThread(),
bucketExecutor, controller.getReadySubDB(), controller.getNotReadySubDB(),
bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier,
diskMemUsageNotifier, config.getBlockableJobConfig(), docTypeName, bucketSpace);
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
index f981ebb1116..3b4526e6f7c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp
@@ -40,10 +40,12 @@ isRunningOrRunnable(const MaintenanceJobRunner & job, const Executor * master) {
MaintenanceController::MaintenanceController(IThreadService &masterThread,
vespalib::Executor & defaultExecutor,
+ MonitoredRefCount & refCount,
const DocTypeName &docTypeName)
: IBucketFreezeListener(),
_masterThread(masterThread),
_defaultExecutor(defaultExecutor),
+ _refCount(refCount),
_readySubDB(),
_remSubDB(),
_notReadySubDB(),
diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
index 0e080cf6445..6415c51eeed 100644
--- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
+++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h
@@ -7,8 +7,10 @@
#include "frozenbuckets.h"
#include "ibucketfreezelistener.h"
#include <vespa/searchcore/proton/common/doctypename.h>
-#include <mutex>
+#include <vespa/searchcore/proton/common/monitored_refcount.h>
#include <vespa/vespalib/util/scheduledexecutor.h>
+#include <mutex>
+
namespace vespalib {
class Timer;
@@ -20,6 +22,7 @@ namespace proton {
class MaintenanceJobRunner;
class DocumentDBMaintenanceConfig;
+class MonitoredRefCount;
/**
* Class that controls the bucket moving between ready and notready sub databases
@@ -35,7 +38,7 @@ public:
using UP = std::unique_ptr<MaintenanceController>;
enum class State {INITIALIZING, STARTED, PAUSED, STOPPING};
- MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, const DocTypeName &docTypeName);
+ MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, MonitoredRefCount & refCount, const DocTypeName &docTypeName);
~MaintenanceController() override;
void registerJobInMasterThread(IMaintenanceJob::UP job);
@@ -73,12 +76,14 @@ public:
const MaintenanceDocumentSubDB & getNotReadySubDB() const { return _notReadySubDB; }
IThreadService & masterThread() { return _masterThread; }
const DocTypeName & getDocTypeName() const { return _docTypeName; }
+ RetainGuard retainDB() { return RetainGuard(_refCount); }
private:
using Mutex = std::mutex;
using Guard = std::lock_guard<Mutex>;
IThreadService &_masterThread;
vespalib::Executor &_defaultExecutor;
+ MonitoredRefCount &_refCount;
MaintenanceDocumentSubDB _readySubDB;
MaintenanceDocumentSubDB _remSubDB;
MaintenanceDocumentSubDB _notReadySubDB;
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
index c2f80f78b8a..d298c0fac24 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
@@ -21,15 +21,11 @@ PersistenceHandlerProxy::PersistenceHandlerProxy(DocumentDB::SP documentDB)
: _documentDB(std::move(documentDB)),
_feedHandler(_documentDB->getFeedHandler()),
_bucketHandler(_documentDB->getBucketHandler()),
- _clusterStateHandler(_documentDB->getClusterStateHandler())
-{
- _documentDB->retain();
-}
+ _clusterStateHandler(_documentDB->getClusterStateHandler()),
+ _retainGuard(_documentDB->retain())
+{ }
-PersistenceHandlerProxy::~PersistenceHandlerProxy()
-{
- _documentDB->release();
-}
+PersistenceHandlerProxy::~PersistenceHandlerProxy() = default;
void
PersistenceHandlerProxy::initialize()
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
index ee6e8a7ee42..ce95cc3bddd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
@@ -3,6 +3,7 @@
#pragma once
#include <vespa/searchcore/proton/persistenceengine/ipersistencehandler.h>
+#include <vespa/searchcore/proton/common/monitored_refcount.h>
namespace proton {
@@ -13,10 +14,11 @@ class ClusterStateHandler;
class PersistenceHandlerProxy : public IPersistenceHandler
{
private:
- std::shared_ptr<DocumentDB> _documentDB;
- FeedHandler &_feedHandler;
- BucketHandler &_bucketHandler;
- ClusterStateHandler &_clusterStateHandler;
+ std::shared_ptr<DocumentDB> _documentDB;
+ FeedHandler &_feedHandler;
+ BucketHandler &_bucketHandler;
+ ClusterStateHandler &_clusterStateHandler;
+ RetainGuard _retainGuard;
public:
explicit PersistenceHandlerProxy(std::shared_ptr<DocumentDB> documentDB);
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp
index 6da6c09cdba..2c1133ad1d6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.cpp
@@ -8,15 +8,11 @@
namespace proton {
SearchHandlerProxy::SearchHandlerProxy(DocumentDB::SP documentDB)
- : _documentDB(std::move(documentDB))
-{
- _documentDB->retain();
-}
+ : _documentDB(std::move(documentDB)),
+ _retainGuard(_documentDB->retain())
+{ }
-SearchHandlerProxy::~SearchHandlerProxy()
-{
- _documentDB->release();
-}
+SearchHandlerProxy::~SearchHandlerProxy() = default;
std::unique_ptr<search::engine::DocsumReply>
SearchHandlerProxy::getDocsums(const DocsumRequest & request)
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h
index fc4f517fb36..d6690fda47e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/searchhandlerproxy.h
@@ -3,6 +3,7 @@
#pragma once
#include <vespa/searchcore/proton/summaryengine/isearchhandler.h>
+#include <vespa/searchcore/proton/common/monitored_refcount.h>
namespace proton {
@@ -12,6 +13,7 @@ class SearchHandlerProxy : public ISearchHandler
{
private:
std::shared_ptr<DocumentDB> _documentDB;
+ RetainGuard _retainGuard;
public:
SearchHandlerProxy(std::shared_ptr<DocumentDB> documentDB);