diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-23 16:01:22 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-23 16:13:09 +0000 |
commit | 36b3be76ae151682295faa04de17989f3600c29b (patch) | |
tree | fbe5156d396bfa694f9acd932b466088c20d2e63 /searchcore | |
parent | 23dcf1db150a66ec66cf746d4234982fdbb0e6e2 (diff) |
Add shutdown to thread interface.
Let the optimize config control index and summary executor too.
Diffstat (limited to 'searchcore')
15 files changed, 79 insertions, 53 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 30fed6fa49e..6c9ffc210a1 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -55,7 +55,6 @@ using storage::spi::RemoveResult; using storage::spi::Result; using storage::spi::Timestamp; using storage::spi::UpdateResult; -using vespalib::BlockingThreadStackExecutor; using vespalib::ThreadStackExecutor; using vespalib::ThreadStackExecutorBase; using vespalib::makeClosure; diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 264cf6d8cfa..8cc075773f7 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -48,7 +48,6 @@ using search::memoryindex::FieldIndexCollection; using search::queryeval::Source; using std::set; using std::string; -using vespalib::BlockingThreadStackExecutor; using vespalib::ThreadStackExecutor; using vespalib::makeLambdaTask; using std::chrono::duration_cast; diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp index dfb1268aaa6..c26b008f769 100644 --- a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp +++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp @@ -243,7 +243,6 @@ struct MyLog struct MyProtonConfigurerOwner : public IProtonConfigurerOwner, public MyLog { - using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; vespalib::ThreadStackExecutor _executor; std::map<DocTypeName, std::shared_ptr<MyDocumentDBConfigOwner>> _dbs; @@ -254,9 +253,9 @@ struct MyProtonConfigurerOwner : public IProtonConfigurerOwner, _dbs() { } - virtual ~MyProtonConfigurerOwner() { } + ~MyProtonConfigurerOwner() { } - virtual std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName, + std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName, document::BucketSpace bucketSpace, const vespalib::string &configId, const std::shared_ptr<BootstrapConfig> &bootstrapConfig, @@ -275,14 +274,14 @@ struct MyProtonConfigurerOwner : public IProtonConfigurerOwner, _log.push_back(os.str()); return db; } - virtual void removeDocumentDB(const DocTypeName &docTypeName) override { + void removeDocumentDB(const DocTypeName &docTypeName) override { ASSERT_FALSE(_dbs.find(docTypeName) == _dbs.end()); _dbs.erase(docTypeName); std::ostringstream os; os << "remove db " << docTypeName.getName(); _log.push_back(os.str()); } - virtual void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) override { + void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) override { std::ostringstream os; os << "apply config " << bootstrapConfig->getGeneration(); _log.push_back(os.str()); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index f296e264903..1a1d97a657b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -85,7 +85,7 @@ private: DocumentStoreCacheStats() : total(), readySubDb(), notReadySubDb(), removedSubDb() {} }; - using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; + using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; using IFlushTargetList = std::vector<std::shared_ptr<searchcorespi::IFlushTarget>>; using StatusReportUP = std::unique_ptr<StatusReport>; using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType; diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp index 6ca385711b0..b13fa2baed3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp @@ -2,13 +2,14 @@ #include "executor_thread_service.h" #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/gate.h> #include <vespa/fastos/thread.h> using vespalib::makeLambdaTask; using vespalib::Executor; using vespalib::Gate; using vespalib::Runnable; -using vespalib::ThreadStackExecutorBase; +using vespalib::SyncableThreadExecutor; namespace proton { @@ -28,7 +29,7 @@ sampleThreadId(FastOS_ThreadId *threadId) } std::unique_ptr<internal::ThreadId> -getThreadId(ThreadStackExecutorBase &executor) +getThreadId(SyncableThreadExecutor &executor) { std::unique_ptr<internal::ThreadId> id = std::make_unique<internal::ThreadId>(); executor.execute(makeLambdaTask([threadId=&id->_id] { sampleThreadId(threadId);})); @@ -45,7 +46,7 @@ runRunnable(Runnable *runnable, Gate *gate) } // namespace -ExecutorThreadService::ExecutorThreadService(ThreadStackExecutorBase &executor) +ExecutorThreadService::ExecutorThreadService(SyncableThreadExecutor &executor) : _executor(executor), _threadId(getThreadId(executor)) { diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h index ccdfb6b72cd..26069b4b8dd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h @@ -2,7 +2,7 @@ #pragma once #include <vespa/searchcorespi/index/i_thread_service.h> -#include <vespa/vespalib/util/threadstackexecutorbase.h> +#include <vespa/vespalib/util/threadexecutor.h> namespace proton { @@ -14,11 +14,11 @@ namespace internal { struct ThreadId; } class ExecutorThreadService : public searchcorespi::index::IThreadService { private: - vespalib::ThreadStackExecutorBase &_executor; + vespalib::SyncableThreadExecutor &_executor; std::unique_ptr<internal::ThreadId> _threadId; public: - ExecutorThreadService(vespalib::ThreadStackExecutorBase &executor); + ExecutorThreadService(vespalib::SyncableThreadExecutor &executor); ~ExecutorThreadService(); Stats getStats() override; @@ -31,6 +31,10 @@ public: _executor.sync(); return *this; } + ExecutorThreadService & shutdown() override { + _executor.shutdown(); + return *this; + } bool isCurrentThread() const override; size_t getNumThreads() const override { return _executor.getNumThreads(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 6e7b4967f6d..a725b00d485 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -3,23 +3,42 @@ #include "executorthreadingservice.h" #include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h> #include <vespa/searchlib/common/sequencedtaskexecutor.h> +#include <vespa/vespalib/util/singleexecutor.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> -using vespalib::ThreadStackExecutorBase; + +using vespalib::SyncableThreadExecutor; +using vespalib::BlockingThreadStackExecutor; +using vespalib::SingleExecutor; using search::SequencedTaskExecutor; +using OptimizeFor = vespalib::Executor::OptimizeFor; namespace proton { -ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutorBase & sharedExecutor, +namespace { + +std::unique_ptr<SyncableThreadExecutor> +createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor optimize) { + if (optimize == OptimizeFor::THROUGHPUT) { + return std::make_unique<SingleExecutor>(taskLimit); + } else { + return std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit); + } +} + +} + +ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor & sharedExecutor, uint32_t threads, uint32_t stackSize, uint32_t taskLimit, OptimizeFor optimize) : _sharedExecutor(sharedExecutor), _masterExecutor(1, stackSize), - _indexExecutor(1, stackSize, taskLimit), - _summaryExecutor(1, stackSize, taskLimit), + _indexExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)), + _summaryExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)), _masterService(_masterExecutor), - _indexService(_indexExecutor), - _summaryService(_summaryExecutor), + _indexService(*_indexExecutor), + _summaryService(*_summaryExecutor), _indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)), _indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)), _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize)) @@ -36,8 +55,8 @@ ExecutorThreadingService::sync() _masterExecutor.sync(); } _attributeFieldWriter->sync(); - _indexExecutor.sync(); - _summaryExecutor.sync(); + _indexExecutor->sync(); + _summaryExecutor->sync(); _indexFieldInverter->sync(); _indexFieldWriter->sync(); if (!isMasterThread) { @@ -52,10 +71,10 @@ ExecutorThreadingService::shutdown() _masterExecutor.shutdown(); _masterExecutor.sync(); _attributeFieldWriter->sync(); - _summaryExecutor.shutdown(); - _summaryExecutor.sync(); - _indexExecutor.shutdown(); - _indexExecutor.sync(); + _summaryExecutor->shutdown(); + _summaryExecutor->sync(); + _indexExecutor->shutdown(); + _indexExecutor->sync(); _indexFieldInverter->sync(); _indexFieldWriter->sync(); } @@ -63,8 +82,8 @@ ExecutorThreadingService::shutdown() void ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit) { - _indexExecutor.setTaskLimit(taskLimit); - _summaryExecutor.setTaskLimit(summaryTaskLimit); + _indexExecutor->setTaskLimit(taskLimit); + _summaryExecutor->setTaskLimit(summaryTaskLimit); _indexFieldInverter->setTaskLimit(taskLimit); _indexFieldWriter->setTaskLimit(taskLimit); _attributeFieldWriter->setTaskLimit(taskLimit); @@ -74,8 +93,8 @@ ExecutorThreadingServiceStats ExecutorThreadingService::getStats() { return ExecutorThreadingServiceStats(_masterExecutor.getStats(), - _indexExecutor.getStats(), - _summaryExecutor.getStats(), + _indexExecutor->getStats(), + _summaryExecutor->getStats(), _sharedExecutor.getStats(), _indexFieldInverter->getStats(), _indexFieldWriter->getStats(), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 2e4dd2035f3..4d018e2b6f3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -3,7 +3,6 @@ #include "executor_thread_service.h" #include <vespa/searchcorespi/index/ithreadingservice.h> -#include <vespa/vespalib/util/blockingthreadstackexecutor.h> #include <vespa/vespalib/util/threadstackexecutor.h> namespace proton { @@ -17,16 +16,16 @@ class ExecutorThreadingServiceStats; class ExecutorThreadingService : public searchcorespi::index::IThreadingService { private: - vespalib::ThreadStackExecutorBase & _sharedExecutor; - vespalib::ThreadStackExecutor _masterExecutor; - vespalib::BlockingThreadStackExecutor _indexExecutor; - vespalib::BlockingThreadStackExecutor _summaryExecutor; - ExecutorThreadService _masterService; - ExecutorThreadService _indexService; - ExecutorThreadService _summaryService; - std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldInverter; - std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldWriter; - std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter; + vespalib::SyncableThreadExecutor & _sharedExecutor; + vespalib::ThreadStackExecutor _masterExecutor; + std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor; + std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor; + ExecutorThreadService _masterService; + ExecutorThreadService _indexService; + ExecutorThreadService _summaryService; + std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldInverter; + std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldWriter; + std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter; public: using OptimizeFor = vespalib::Executor::OptimizeFor; @@ -36,7 +35,7 @@ public: * @stackSize The size of the stack of the underlying executors. * @taskLimit The task limit for the index executor. */ - ExecutorThreadingService(vespalib::ThreadStackExecutorBase &sharedExecutor, + ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t threads = 1, uint32_t stackSize = 128 * 1024, uint32_t taskLimit = 1000, @@ -56,11 +55,11 @@ public: vespalib::ThreadStackExecutorBase &getMasterExecutor() { return _masterExecutor; } - vespalib::ThreadStackExecutorBase &getIndexExecutor() { - return _indexExecutor; + vespalib::SyncableThreadExecutor &getIndexExecutor() { + return *_indexExecutor; } - vespalib::ThreadStackExecutorBase &getSummaryExecutor() { - return _summaryExecutor; + vespalib::SyncableThreadExecutor &getSummaryExecutor() { + return *_summaryExecutor; } /** @@ -76,7 +75,7 @@ public: searchcorespi::index::IThreadService &summary() override { return _summaryService; } - vespalib::ThreadExecutor &shared() override { + vespalib::SyncableThreadExecutor &shared() override { return _sharedExecutor; } diff --git a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h index fec8430e41d..5a457b168ec 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h @@ -18,8 +18,8 @@ class DocumentDBConfigOwner; */ class IProtonConfigurerOwner { - using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; public: + using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; virtual ~IProtonConfigurerOwner() { } virtual std::shared_ptr<DocumentDBConfigOwner> addDocumentDB(const DocTypeName &docTypeName, document::BucketSpace bucketSpace, diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 20de5bb07c1..3f8db3f2ff9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -32,6 +32,7 @@ #include <vespa/vespalib/util/host_name.h> #include <vespa/vespalib/util/random.h> #include <vespa/vespalib/net/state_server.h> +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> #include <vespa/searchlib/aggregation/forcelink.hpp> #include <vespa/searchlib/expression/forcelink.hpp> diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 410f45162e4..4c9d4c77cc4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -57,7 +57,7 @@ private: typedef search::engine::MonitorClient MonitorClient; typedef std::map<DocTypeName, DocumentDB::SP> DocumentDBMap; typedef BootstrapConfig::ProtonConfigSP ProtonConfigSP; - using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; + using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; using BucketSpace = document::BucketSpace; struct MetricsUpdateHook : metrics::UpdateHook diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp index 0b9293a4aab..45e3c978dd9 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp @@ -39,7 +39,7 @@ getBucketSpace(const BootstrapConfig &bootstrapConfig, const DocTypeName &name) } -ProtonConfigurer::ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor, +ProtonConfigurer::ProtonConfigurer(vespalib::SyncableThreadExecutor &executor, IProtonConfigurerOwner &owner, const std::unique_ptr<IProtonDiskLayout> &diskLayout) : IProtonConfigurer(), diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h index c896f12bd4f..54399a26365 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h @@ -25,7 +25,7 @@ class IProtonDiskLayout; class ProtonConfigurer : public IProtonConfigurer { using DocumentDBs = std::map<DocTypeName, std::pair<std::weak_ptr<IDocumentDBConfigOwner>, std::weak_ptr<DocumentDBDirectoryHolder>>>; - using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; + using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; ExecutorThreadService _executor; IProtonConfigurerOwner &_owner; @@ -48,11 +48,11 @@ class ProtonConfigurer : public IProtonConfigurer void pruneInitialDocumentDBDirs(const ProtonConfigSnapshot &configSnapshot); public: - ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor, + ProtonConfigurer(vespalib::SyncableThreadExecutor &executor, IProtonConfigurerOwner &owner, const std::unique_ptr<IProtonDiskLayout> &diskLayout); - ~ProtonConfigurer(); + ~ProtonConfigurer() override; void setAllowReconfig(bool allowReconfig); diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h index 766bdeeefb0..127b696c4ab 100644 --- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h @@ -31,6 +31,10 @@ public: _service.sync(); return *this; } + ThreadServiceObserver &shutdown() override { + _service.shutdown(); + return *this; + } bool isCurrentThread() const override { return _service.isCurrentThread(); } diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index 7ac9c0c68f2..23c62d179b1 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -69,6 +69,7 @@ public: search::ISequencedTaskExecutor &attributeFieldWriter() override { return _attributeFieldWriter; } + }; } |