diff options
16 files changed, 174 insertions, 48 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 2c21a30396d..9bc374b8386 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -35,18 +35,18 @@ struct ControllerFixtureBase : public ::testing::Test test::BucketHandler _bucketHandler; MyBucketModifiedHandler _modifiedHandler; std::shared_ptr<bucketdb::BucketDBOwner> _bucketDB; - MySubDb _ready; - MySubDb _notReady; - BucketCreateNotifier _bucketCreateNotifier; - test::DiskMemUsageNotifier _diskMemUsageNotifier; - MonitoredRefCount _refCount; - ThreadStackExecutor _singleExecutor; - ExecutorThreadService _master; - DummyBucketExecutor _bucketExecutor; - MyMoveHandler _moveHandler; - DocumentDBTaggedMetrics _metrics; + MySubDb _ready; + MySubDb _notReady; + BucketCreateNotifier _bucketCreateNotifier; + test::DiskMemUsageNotifier _diskMemUsageNotifier; + MonitoredRefCount _refCount; + ThreadStackExecutor _singleExecutor; + SyncableExecutorThreadService _master; + DummyBucketExecutor _bucketExecutor; + MyMoveHandler _moveHandler; + DocumentDBTaggedMetrics _metrics; std::shared_ptr<BucketMoveJob> _bmj; - MyCountJobRunner _runner; + MyCountJobRunner _runner; ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); ~ControllerFixtureBase(); ControllerFixtureBase &addReady(const BucketId &bucket) { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp index 13955953eb5..8f88d678c0c 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.cpp @@ -54,7 +54,7 @@ JobTestBase::init(uint32_t allowedLidBloat, _job.reset(); _singleExecutor = std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); - _master = std::make_unique<proton::ExecutorThreadService> (*_singleExecutor); + _master = std::make_unique<proton::SyncableExecutorThreadService> (*_singleExecutor); _bucketExecutor = std::make_unique<storage::spi::dummy::DummyBucketExecutor>(4); _job = lidspace::CompactionJob::create(compactCfg, RetainGuard(_refCount), _handler, _storer, *_master, *_bucketExecutor, _diskMemUsageNotifier, blockableCfg, _clusterStateHandler, nodeRetired, diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h index 14f2ff42dbe..5875910f4d9 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_jobtest.h @@ -14,7 +14,7 @@ struct JobTestBase : public ::testing::Test { test::DiskMemUsageNotifier _diskMemUsageNotifier; std::unique_ptr<storage::spi::dummy::DummyBucketExecutor> _bucketExecutor; std::unique_ptr<vespalib::SyncableThreadExecutor> _singleExecutor; - std::unique_ptr<searchcorespi::index::IThreadService> _master; + std::unique_ptr<searchcorespi::index::ISyncableThreadService> _master; std::shared_ptr<MyHandler> _handler; MyStorer _storer; std::shared_ptr<BlockableMaintenanceJob> _job; diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 8940b01b91d..227e885564d 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -326,7 +326,7 @@ class MaintenanceControllerFixture public: MyExecutor _executor; MyExecutor _genericExecutor; - ExecutorThreadService _threadService; + SyncableExecutorThreadService _threadService; DummyBucketExecutor _bucketExecutor; DocTypeName _docTypeName; test::UserDocumentsBuilder _builder; diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index d6bbc77aa09..1e33482b055 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -210,8 +210,8 @@ IndexManagerTest::resetIndexManager() { _index_manager.reset(); _index_manager = std::make_unique<IndexManager>(index_dir, IndexConfig(), getSchema(), 1, - _reconfigurer, _writeService, _writeService.master(), - TuneFileIndexManager(), TuneFileAttributes(),_fileHeaderContext); + _reconfigurer, _writeService, _sharedExecutor, + TuneFileIndexManager(), TuneFileAttributes(), _fileHeaderContext); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp index 684132b34e7..74f6a622661 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp @@ -9,6 +9,7 @@ using vespalib::makeLambdaTask; using vespalib::Executor; using vespalib::Gate; using vespalib::Runnable; +using vespalib::ThreadExecutor; using vespalib::SyncableThreadExecutor; namespace proton { @@ -29,11 +30,15 @@ sampleThreadId(FastOS_ThreadId *threadId) } std::unique_ptr<internal::ThreadId> -getThreadId(SyncableThreadExecutor &executor) +getThreadId(ThreadExecutor &executor) { std::unique_ptr<internal::ThreadId> id = std::make_unique<internal::ThreadId>(); - executor.execute(makeLambdaTask([threadId=&id->_id] { sampleThreadId(threadId);})); - executor.sync(); + vespalib::Gate gate; + executor.execute(makeLambdaTask([threadId=&id->_id, &gate] { + sampleThreadId(threadId); + gate.countDown(); + })); + gate.await(); return id; } @@ -46,7 +51,7 @@ runRunnable(Runnable *runnable, Gate *gate) } // namespace -ExecutorThreadService::ExecutorThreadService(SyncableThreadExecutor &executor) +ExecutorThreadService::ExecutorThreadService(ThreadExecutor &executor) : _executor(executor), _threadId(getThreadId(executor)) { @@ -90,4 +95,51 @@ ExecutorThreadService::wakeup() { _executor.wakeup(); } +SyncableExecutorThreadService::SyncableExecutorThreadService(SyncableThreadExecutor &executor) + : _executor(executor), + _threadId(getThreadId(executor)) +{ +} + +SyncableExecutorThreadService::~SyncableExecutorThreadService() = default; + +void +SyncableExecutorThreadService::run(Runnable &runnable) +{ + if (isCurrentThread()) { + runnable.run(); + } else { + Gate gate; + _executor.execute(makeLambdaTask([runnablePtr=&runnable, gatePtr=&gate] { runRunnable(runnablePtr, gatePtr); })); + gate.await(); + } +} + +bool +SyncableExecutorThreadService::isCurrentThread() const +{ + FastOS_ThreadId currentThreadId = FastOS_Thread::GetCurrentThreadId(); + return FastOS_Thread::CompareThreadIds(_threadId->_id, currentThreadId); +} + +vespalib::ExecutorStats +SyncableExecutorThreadService::getStats() { + return _executor.getStats(); +} + +void +SyncableExecutorThreadService::setTaskLimit(uint32_t taskLimit) { + _executor.setTaskLimit(taskLimit); +} + +uint32_t +SyncableExecutorThreadService::getTaskLimit() const { + return _executor.getTaskLimit(); +} + +void +SyncableExecutorThreadService::wakeup() { + _executor.wakeup(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h index 44a330ca696..7298b81611a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h @@ -14,11 +14,11 @@ namespace internal { struct ThreadId; } class ExecutorThreadService : public searchcorespi::index::IThreadService { private: - vespalib::SyncableThreadExecutor &_executor; + vespalib::ThreadExecutor &_executor; std::unique_ptr<internal::ThreadId> _threadId; public: - ExecutorThreadService(vespalib::SyncableThreadExecutor &executor); + ExecutorThreadService(vespalib::ThreadExecutor &executor); ~ExecutorThreadService(); vespalib::ExecutorStats getStats() override; @@ -27,14 +27,36 @@ public: return _executor.execute(std::move(task)); } void run(vespalib::Runnable &runnable) override; + + bool isCurrentThread() const override; + size_t getNumThreads() const override { return _executor.getNumThreads(); } + + void setTaskLimit(uint32_t taskLimit) override; + uint32_t getTaskLimit() const override; + void wakeup() override; +}; + +class SyncableExecutorThreadService : public searchcorespi::index::ISyncableThreadService +{ +private: + vespalib::SyncableThreadExecutor &_executor; + std::unique_ptr<internal::ThreadId> _threadId; + +public: + SyncableExecutorThreadService(vespalib::SyncableThreadExecutor &executor); + ~SyncableExecutorThreadService(); + + vespalib::ExecutorStats getStats() override; + + vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) override { + return _executor.execute(std::move(task)); + } + void run(vespalib::Runnable &runnable) override; vespalib::Syncable &sync() override { _executor.sync(); return *this; } - ExecutorThreadService & shutdown() override { - _executor.shutdown(); - return *this; - } + bool isCurrentThread() const override; size_t getNumThreads() const override { return _executor.getNumThreads(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 6c537b1beeb..86530f235fd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -54,7 +54,6 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)), _masterService(_masterExecutor), _indexService(*_indexExecutor), - _summaryService(*_summaryExecutor), _indexFieldInverter(), _indexFieldWriter(), _attributeFieldWriter(), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index b3d05021ab5..2a4c57ef57d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -23,9 +23,8 @@ private: std::atomic<uint32_t> _master_task_limit; std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor; std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor; - ExecutorThreadService _masterService; + SyncableExecutorThreadService _masterService; ExecutorThreadService _indexService; - ExecutorThreadService _summaryService; std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldInverter; std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldWriter; std::unique_ptr<vespalib::ISequencedTaskExecutor> _attributeFieldWriter; @@ -60,7 +59,7 @@ public: uint32_t field_task_limit, uint32_t summary_task_limit); - searchcorespi::index::IThreadService &master() override { + searchcorespi::index::ISyncableThreadService &master() override { return _masterService; } searchcorespi::index::IThreadService &index() override { diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index 6b606298026..0d75464a161 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -39,7 +39,7 @@ isRunnable(const MaintenanceJobRunner & job, const Executor * master) { } -MaintenanceController::MaintenanceController(IThreadService &masterThread, +MaintenanceController::MaintenanceController(ISyncableThreadService &masterThread, vespalib::Executor & defaultExecutor, MonitoredRefCount & refCount, const DocTypeName &docTypeName) @@ -140,6 +140,11 @@ MaintenanceController::stop() _masterThread.sync(); // Wait for already scheduled maintenance jobs and performHoldJobs } +searchcorespi::index::IThreadService & +MaintenanceController::masterThread() { + return _masterThread; +} + void MaintenanceController::kill() { diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h index 8c8cc3e2d43..f2c425b2fd0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.h @@ -17,7 +17,10 @@ class MonitoredRefCount; class Timer; } -namespace searchcorespi::index { struct IThreadService; } +namespace searchcorespi::index { + struct IThreadService; + struct ISyncableThreadService; +} namespace proton { @@ -33,12 +36,13 @@ class MaintenanceController { public: using IThreadService = searchcorespi::index::IThreadService; + using ISyncableThreadService = searchcorespi::index::ISyncableThreadService; using DocumentDBMaintenanceConfigSP = std::shared_ptr<DocumentDBMaintenanceConfig>; using JobList = std::vector<std::shared_ptr<MaintenanceJobRunner>>; using UP = std::unique_ptr<MaintenanceController>; enum class State {INITIALIZING, STARTED, PAUSED, STOPPING}; - MaintenanceController(IThreadService &masterThread, vespalib::Executor & defaultExecutor, vespalib::MonitoredRefCount & refCount, const DocTypeName &docTypeName); + MaintenanceController(ISyncableThreadService &masterThread, vespalib::Executor & defaultExecutor, vespalib::MonitoredRefCount & refCount, const DocTypeName &docTypeName); ~MaintenanceController(); void registerJobInMasterThread(IMaintenanceJob::UP job); @@ -70,14 +74,14 @@ public: const MaintenanceDocumentSubDB & getReadySubDB() const { return _readySubDB; } const MaintenanceDocumentSubDB & getRemSubDB() const { return _remSubDB; } const MaintenanceDocumentSubDB & getNotReadySubDB() const { return _notReadySubDB; } - IThreadService & masterThread() { return _masterThread; } + IThreadService & masterThread(); const DocTypeName & getDocTypeName() const { return _docTypeName; } vespalib::RetainGuard retainDB() { return vespalib::RetainGuard(_refCount); } private: using Mutex = std::mutex; using Guard = std::lock_guard<Mutex>; - IThreadService &_masterThread; + ISyncableThreadService &_masterThread; vespalib::Executor &_defaultExecutor; vespalib::MonitoredRefCount &_refCount; MaintenanceDocumentSubDB _readySubDB; diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h index 829da0756f8..3ebfbc378d7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h @@ -27,7 +27,7 @@ class ProtonConfigurer : public IProtonConfigurer using DocumentDBs = std::map<DocTypeName, std::pair<std::weak_ptr<IDocumentDBConfigOwner>, std::weak_ptr<DocumentDBDirectoryHolder>>>; using InitializeThreads = std::shared_ptr<vespalib::SyncableThreadExecutor>; - ExecutorThreadService _executor; + SyncableExecutorThreadService _executor; IProtonConfigurerOwner &_owner; DocumentDBs _documentDBs; std::shared_ptr<ProtonConfigSnapshot> _pendingConfigSnapshot; diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h index 5b679da7b51..0f199e10cb1 100644 --- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h @@ -66,14 +66,56 @@ public: void run(vespalib::Runnable &runnable) override { _service.run(runnable); } + + bool isCurrentThread() const override { + return _service.isCurrentThread(); + } + size_t getNumThreads() const override { return _service.getNumThreads(); } + + vespalib::ExecutorStats getStats() override { + return _service.getStats(); + } + + void setTaskLimit(uint32_t taskLimit) override { + _service.setTaskLimit(taskLimit); + } + + uint32_t getTaskLimit() const override { + return _service.getTaskLimit(); + } + + void wakeup() override { + _service.wakeup(); + } +}; + +class SyncableThreadServiceObserver : public searchcorespi::index::ISyncableThreadService +{ +private: + searchcorespi::index::ISyncableThreadService &_service; + uint32_t _executeCnt; + +public: + SyncableThreadServiceObserver(searchcorespi::index::ISyncableThreadService &service) + : _service(service), + _executeCnt(0) + { + } + + uint32_t getExecuteCnt() const { return _executeCnt; } + + vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) override { + ++_executeCnt; + return _service.execute(std::move(task)); + } + void run(vespalib::Runnable &runnable) override { + _service.run(runnable); + } vespalib::Syncable &sync() override { _service.sync(); return *this; } - ThreadServiceObserver &shutdown() override { - _service.shutdown(); - return *this; - } + bool isCurrentThread() const override { return _service.isCurrentThread(); } diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index 18489624d1a..e93b1632b3f 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -12,10 +12,10 @@ class ThreadingServiceObserver : public searchcorespi::index::IThreadingService { private: searchcorespi::index::IThreadingService &_service; - ThreadServiceObserver _master; - ThreadServiceObserver _index; - ThreadExecutorObserver _summary; - vespalib::ThreadExecutor & _shared; + SyncableThreadServiceObserver _master; + ThreadServiceObserver _index; + ThreadExecutorObserver _summary; + vespalib::ThreadExecutor & _shared; vespalib::SequencedTaskExecutorObserver _indexFieldInverter; vespalib::SequencedTaskExecutorObserver _indexFieldWriter; vespalib::SequencedTaskExecutorObserver _attributeFieldWriter; @@ -23,7 +23,7 @@ private: public: ThreadingServiceObserver(searchcorespi::index::IThreadingService &service); ~ThreadingServiceObserver() override; - const ThreadServiceObserver &masterObserver() const { + const SyncableThreadServiceObserver &masterObserver() const { return _master; } const ThreadServiceObserver &indexObserver() const { @@ -37,7 +37,7 @@ public: _service.blocking_master_execute(std::move(task)); } - searchcorespi::index::IThreadService &master() override { + searchcorespi::index::ISyncableThreadService &master() override { return _master; } searchcorespi::index::IThreadService &index() override { diff --git a/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h index b4e51e2dd1b..f973908b62d 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h +++ b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h @@ -9,7 +9,7 @@ namespace searchcorespi::index { /** * Interface for a single thread used for write tasks. */ -struct IThreadService : public vespalib::SyncableThreadExecutor +struct IThreadService : public vespalib::ThreadExecutor { IThreadService(const IThreadService &) = delete; IThreadService & operator = (const IThreadService &) = delete; @@ -25,6 +25,9 @@ struct IThreadService : public vespalib::SyncableThreadExecutor * Returns whether the current thread is the underlying thread. */ virtual bool isCurrentThread() const = 0; +}; + +struct ISyncableThreadService : public IThreadService, vespalib::Syncable { }; diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h index 72362688659..c95a42f601b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h +++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h @@ -69,7 +69,7 @@ struct IThreadingService */ virtual void blocking_master_execute(vespalib::Executor::Task::UP task) = 0; - virtual IThreadService &master() = 0; + virtual ISyncableThreadService &master() = 0; virtual IThreadService &index() = 0; virtual vespalib::ThreadExecutor &summary() = 0; virtual vespalib::ThreadExecutor &shared() = 0; |