diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-26 14:53:37 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-26 14:53:37 +0100 |
commit | 6d296cbc0320e5eac9c8a1e31819f6753ac7fce8 (patch) | |
tree | 65d107a4f03ec1b35fd6fafeb780b2290ef70c2f | |
parent | e7fc5ae35312770c01784b1ea1b7a2d6cf3e70bc (diff) | |
parent | 13641d108f2022d3885c5aa3e0ce19a8d764e085 (diff) |
Merge pull request #20243 from vespa-engine/balder/reduce-use-of-syncable
Remove the need for Syncable
13 files changed, 57 insertions, 44 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 15b5bf81670..ed2ce3d638e 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -425,8 +425,7 @@ RPCNetwork::shutdown() { _transport->ShutDown(true); _threadPool->Close(); - _executor->shutdown(); - _executor->sync(); + _executor->shutdown().sync(); } void diff --git a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp index e3e6ac6321e..b80cd08ae8e 100644 --- a/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp +++ b/searchcore/src/tests/proton/documentdb/document_subdbs/document_subdbs_test.cpp @@ -234,8 +234,7 @@ MySearchableContext::MySearchableContext(IThreadingService &writeService, IBucketDBHandlerInitializer & bucketDBHandlerInitializer) : _fastUpdCtx(writeService, bucketDB, bucketDBHandlerInitializer), _queryLimiter(), _clock(), - _ctx(_fastUpdCtx._ctx, _queryLimiter, - _clock, dynamic_cast<vespalib::SyncableThreadExecutor &>(writeService.shared())) + _ctx(_fastUpdCtx._ctx, _queryLimiter, _clock, writeService.shared()) {} MySearchableContext::~MySearchableContext() = default; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 4eaa722e0ba..632f4482654 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -119,7 +119,7 @@ public: /** * Returns the underlying executor. Only used for state explorers. */ - const vespalib::SyncableThreadExecutor& get_executor() const { return _executor; } + const vespalib::ThreadExecutor& get_executor() const { return _executor; } /** * Starts the scheduling thread of this manager. diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h index 7cc0c97048b..3d3be775a4a 100644 --- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h +++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.h @@ -69,7 +69,7 @@ public: /** * Returns the underlying executor. Only used for state explorers. */ - const vespalib::SyncableThreadExecutor& get_executor() const { return _executor; } + const vespalib::ThreadExecutor& get_executor() const { return _executor; } /** * Closes the request handler interface. This will prevent any more data diff --git a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h index 0d6bb07b173..704b54dc566 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h @@ -6,8 +6,6 @@ #include <vespa/vespalib/stllike/string.h> #include <memory> -namespace vespalib { class ThreadStackExecutorBase; } - namespace proton { class DocumentDBConfigOwner; @@ -19,7 +17,7 @@ class DocumentDBConfigOwner; class IProtonConfigurerOwner { public: - using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; + using InitializeThreads = std::shared_ptr<vespalib::ThreadExecutor>; virtual ~IProtonConfigurerOwner() { } virtual std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName, document::BucketSpace bucketSpace, diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index c57d3e26d8b..91635dc7497 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -59,8 +59,7 @@ private: using MonitorReply = search::engine::MonitorReply; using MonitorClient = search::engine::MonitorClient; using DocumentDBMap = std::map<DocTypeName, DocumentDB::SP>; - using ProtonConfigSP = BootstrapConfig::ProtonConfigSP; - using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; + using InitializeThreads = std::shared_ptr<vespalib::ThreadExecutor>; using BucketSpace = document::BucketSpace; class ProtonFileHeaderContext : public search::common::FileHeaderContext diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp index 7c998ceca7c..2c891927fa3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp @@ -12,6 +12,7 @@ #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/config-bucketspaces.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/retain_guard.h> #include <vespa/vespalib/stllike/asciistream.h> #include <future> @@ -42,7 +43,7 @@ getBucketSpace(const BootstrapConfig &bootstrapConfig, const DocTypeName &name) } -ProtonConfigurer::ProtonConfigurer(vespalib::SyncableThreadExecutor &executor, +ProtonConfigurer::ProtonConfigurer(vespalib::ThreadExecutor &executor, IProtonConfigurerOwner &owner, const std::unique_ptr<IProtonDiskLayout> &diskLayout) : IProtonConfigurer(), @@ -58,9 +59,22 @@ ProtonConfigurer::ProtonConfigurer(vespalib::SyncableThreadExecutor &executor, { } -ProtonConfigurer::~ProtonConfigurer() -{ -} +class ProtonConfigurer::ReconfigureTask : public vespalib::Executor::Task { +public: + ReconfigureTask(ProtonConfigurer & configurer) + : _configurer(configurer), + _retainGuard(configurer._pendingReconfigureTasks) + {} + + void run() override { + _configurer.performReconfigure(); + } +private: + ProtonConfigurer & _configurer; + vespalib::RetainGuard _retainGuard; +}; + +ProtonConfigurer::~ProtonConfigurer() = default; void ProtonConfigurer::setAllowReconfig(bool allowReconfig) @@ -72,11 +86,12 @@ ProtonConfigurer::setAllowReconfig(bool allowReconfig) _allowReconfig = allowReconfig; if (allowReconfig) { // Ensure that pending config is applied - _executor.execute(makeLambdaTask([this]() { performReconfigure(); })); + _executor.execute(std::make_unique<ReconfigureTask>(*this)); } } if (!allowReconfig) { - _executor.sync(); // drain queued performReconfigure tasks + // drain queued performReconfigure tasks + _pendingReconfigureTasks.waitForZeroRefCount(); } } @@ -102,7 +117,7 @@ ProtonConfigurer::reconfigure(std::shared_ptr<ProtonConfigSnapshot> configSnapsh std::lock_guard<std::mutex> guard(_mutex); _pendingConfigSnapshot = configSnapshot; if (_allowReconfig) { - _executor.execute(makeLambdaTask([&]() { performReconfigure(); })); + _executor.execute(std::make_unique<ReconfigureTask>(*this)); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h index 3ebfbc378d7..ddb9c1bed92 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h @@ -7,6 +7,7 @@ #include <vespa/document/bucket/bucketspace.h> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/vespalib/net/simple_component_config_producer.h> +#include <vespa/vespalib/util/monitored_refcount.h> #include <map> #include <mutex> @@ -25,17 +26,19 @@ class IProtonDiskLayout; class ProtonConfigurer : public IProtonConfigurer { using DocumentDBs = std::map<DocTypeName, std::pair<std::weak_ptr<IDocumentDBConfigOwner>, std::weak_ptr<DocumentDBDirectoryHolder>>>; - using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; + using InitializeThreads = std::shared_ptr<vespalib::ThreadExecutor>; + class ReconfigureTask; - SyncableExecutorThreadService _executor; - IProtonConfigurerOwner &_owner; - DocumentDBs _documentDBs; - std::shared_ptr<ProtonConfigSnapshot> _pendingConfigSnapshot; - std::shared_ptr<ProtonConfigSnapshot> _activeConfigSnapshot; - mutable std::mutex _mutex; - bool _allowReconfig; - vespalib::SimpleComponentConfigProducer _componentConfig; + ExecutorThreadService _executor; + IProtonConfigurerOwner &_owner; + DocumentDBs _documentDBs; + std::shared_ptr<ProtonConfigSnapshot> _pendingConfigSnapshot; + std::shared_ptr<ProtonConfigSnapshot> _activeConfigSnapshot; + mutable std::mutex _mutex; + bool _allowReconfig; + vespalib::SimpleComponentConfigProducer _componentConfig; const std::unique_ptr<IProtonDiskLayout> &_diskLayout; + vespalib::MonitoredRefCount _pendingReconfigureTasks; void performReconfigure(); bool skipConfig(const ProtonConfigSnapshot *configSnapshot, bool initialConfig); @@ -48,7 +51,7 @@ class ProtonConfigurer : public IProtonConfigurer void pruneInitialDocumentDBDirs(const ProtonConfigSnapshot &configSnapshot); public: - ProtonConfigurer(vespalib::SyncableThreadExecutor &executor, + ProtonConfigurer(vespalib::ThreadExecutor &executor, IProtonConfigurerOwner &owner, const std::unique_ptr<IProtonDiskLayout> &diskLayout); diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h index 7f6d9328491..c49649de1e3 100644 --- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h +++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.h @@ -72,7 +72,7 @@ public: /** * Returns the underlying executor. Only used for state explorers. */ - const vespalib::SyncableThreadExecutor& get_executor() const { return _executor; } + const vespalib::ThreadExecutor& get_executor() const { return _executor; } /** * Starts the underlying threads. This will throw a vespalib::Exception if diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp index 0a24efd0e66..b9cbdab1c0a 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp @@ -40,7 +40,7 @@ WarmupIndexCollection::WarmupIndexCollection(const WarmupConfig & warmupConfig, _warmupDone(warmupDone), _warmupEndTime(vespalib::steady_clock::now() + warmupConfig.getDuration()), _handledTerms(std::make_unique<FieldTermMap>()), - _pendingTasks(0) + _pendingTasks() { if (_next->valid()) { setCurrentIndex(_next->getCurrentIndex()); @@ -81,7 +81,7 @@ WarmupIndexCollection::~WarmupIndexCollection() if (_warmupEndTime != vespalib::steady_time()) { LOG(info, "Warmup aborted due to new state change or application shutdown"); } - assert(_pendingTasks == 0); + assert(_pendingTasks.has_zero_ref_count()); } const ISourceSelector & @@ -220,23 +220,19 @@ WarmupIndexCollection::getSearchableSP(uint32_t i) const void WarmupIndexCollection::drainPending() { - while (_pendingTasks > 0) { - std::this_thread::sleep_for(1ms); - } + _pendingTasks.waitForZeroRefCount(); } WarmupIndexCollection::WarmupTask::WarmupTask(std::unique_ptr<MatchData> md, std::shared_ptr<WarmupIndexCollection> warmup) : _warmup(std::move(warmup)), + _retainGuard(_warmup->_pendingTasks), _matchData(std::move(md)), _bluePrint(), _requestContext() { - _warmup->_pendingTasks++; } -WarmupIndexCollection::WarmupTask::~WarmupTask() { - _warmup->_pendingTasks--; -} +WarmupIndexCollection::WarmupTask::~WarmupTask() = default; void WarmupIndexCollection::WarmupTask::run() diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h index 5292fb36298..b0b2952bee8 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h +++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h @@ -5,6 +5,8 @@ #include "isearchableindexcollection.h" #include "warmupconfig.h" #include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/monitored_refcount.h> +#include <vespa/vespalib/util/retain_guard.h> #include <vespa/searchlib/queryeval/fake_requestcontext.h> namespace searchcorespi { @@ -85,9 +87,10 @@ private: private: void run() override; std::shared_ptr<WarmupIndexCollection> _warmup; - std::unique_ptr<MatchData> _matchData; - Blueprint::UP _bluePrint; - FakeRequestContext _requestContext; + vespalib::RetainGuard _retainGuard; + std::unique_ptr<MatchData> _matchData; + Blueprint::UP _bluePrint; + FakeRequestContext _requestContext; }; void fireWarmup(Task::UP task); @@ -102,7 +105,7 @@ private: vespalib::steady_time _warmupEndTime; std::mutex _lock; std::unique_ptr<FieldTermMap> _handledTerms; - std::atomic<uint64_t> _pendingTasks; + vespalib::MonitoredRefCount _pendingTasks; }; } // namespace searchcorespi diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 954a63978f3..7c4711b6802 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -167,7 +167,7 @@ SequencedTaskExecutor::getExecutorIdImPerfect(uint64_t componentId) const { return ExecutorId(executorId); } -const vespalib::SyncableThreadExecutor* +const vespalib::ThreadExecutor* SequencedTaskExecutor::first_executor() const { if (_executors.empty()) { diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 7bb56424849..db0723d16c8 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -8,6 +8,7 @@ namespace vespalib { +class ThreadExecutor; class SyncableThreadExecutor; /** @@ -41,7 +42,7 @@ public: */ uint32_t getComponentHashSize() const { return _component2IdImperfect.size(); } uint32_t getComponentEffectiveHashSize() const { return _nextId; } - const vespalib::SyncableThreadExecutor* first_executor() const; + const vespalib::ThreadExecutor* first_executor() const; private: explicit SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executor); |