diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-30 16:16:09 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-30 16:16:09 +0100 |
commit | 54aba7abd922c1375f3d4728e65bdc7d959e0451 (patch) | |
tree | a290893bd7b7ad66a392f783010424903b66ecc8 /searchcore | |
parent | afff87721e0f59c972c318855ffd5439247f9299 (diff) | |
parent | aabf1addb8c3693257ef96f3642f0b18b8ab4178 (diff) |
Merge pull request #20294 from vespa-engine/balder/use-invokeservice
Use new InvokerService for doing wakeup calls at regular intervals.
Diffstat (limited to 'searchcore')
10 files changed, 53 insertions, 6 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index 4629ebec854..8d7e842bc89 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -32,6 +32,7 @@ public: void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) { service = std::make_unique<ExecutorThreadingService>(shared_executor, field_writer_executor.get(), + nullptr, ThreadingServiceConfig::make(indexing_threads, shared_field_writer)); } SequencedTaskExecutor* index_inverter() { diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 2b2f2422221..53bdc356015 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -175,7 +175,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. _writeServiceConfig(configSnapshot->get_threading_service_config()), - _writeService(shared_service.shared(), shared_service.field_writer(), _writeServiceConfig, indexing_thread_stack_size), + _writeService(shared_service.shared(), shared_service.field_writer(), &shared_service.invokeService(), _writeServiceConfig, indexing_thread_stack_size), _initializeThreads(std::move(initializeThreads)), _initConfigSnapshot(), _initConfigSerialNum(0u), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 52da92ed568..bca8e89d69e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -39,11 +39,12 @@ VESPA_THREAD_STACK_TAG(field_writer_executor) } ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads) - : ExecutorThreadingService(sharedExecutor, nullptr, ThreadingServiceConfig::make(num_treads)) + : ExecutorThreadingService(sharedExecutor, nullptr, nullptr, ThreadingServiceConfig::make(num_treads)) {} ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, vespalib::ISequencedTaskExecutor* field_writer, + vespalib::InvokeService * invokerService, const ThreadingServiceConfig& cfg, uint32_t stackSize) @@ -61,12 +62,20 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _field_writer(), _index_field_inverter_ptr(), _index_field_writer_ptr(), - _attribute_field_writer_ptr() + _attribute_field_writer_ptr(), + _invokeRegistrations() { + if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { + _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_indexExecutor.get()](){ executor->wakeup();})); + _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_summaryExecutor.get()](){ executor->wakeup();})); + } if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) { _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime()); + if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { + _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); + } _index_field_inverter_ptr = _field_writer.get(); _index_field_writer_ptr = _field_writer.get(); _attribute_field_writer_ptr = _attributeFieldWriter.get(); @@ -74,6 +83,9 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) { _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime()); + if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { + _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();})); + } _index_field_inverter_ptr = _field_writer.get(); _index_field_writer_ptr = _field_writer.get(); _attribute_field_writer_ptr = _field_writer.get(); @@ -87,6 +99,9 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime()); + if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { + _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); + } _index_field_inverter_ptr = _indexFieldInverter.get(); _index_field_writer_ptr = _indexFieldWriter.get(); _attribute_field_writer_ptr = _attributeFieldWriter.get(); diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 972e0de0ec0..e55e95c6745 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -5,6 +5,7 @@ #include "threading_service_config.h" #include <vespa/searchcorespi/index/ithreadingservice.h> #include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/invokeservice.h> namespace proton { @@ -17,6 +18,7 @@ class ExecutorThreadingServiceStats; class ExecutorThreadingService : public searchcorespi::index::IThreadingService { private: + using Registration = std::unique_ptr<vespalib::IDestructorCallback>; vespalib::ThreadExecutor & _sharedExecutor; vespalib::ThreadStackExecutor _masterExecutor; ThreadingServiceConfig::SharedFieldWriterExecutor _shared_field_writer; @@ -32,6 +34,7 @@ private: vespalib::ISequencedTaskExecutor* _index_field_inverter_ptr; vespalib::ISequencedTaskExecutor* _index_field_writer_ptr; vespalib::ISequencedTaskExecutor* _attribute_field_writer_ptr; + std::vector<Registration> _invokeRegistrations; void syncOnce(); public: @@ -43,6 +46,7 @@ public: ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, vespalib::ISequencedTaskExecutor* field_writer, + vespalib::InvokeService * invokeService, const ThreadingServiceConfig& cfg, uint32_t stackSize = 128 * 1024); ~ExecutorThreadingService() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h index 9cd19012223..dfa48cb8d1a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h @@ -4,6 +4,7 @@ namespace vespalib { class ISequencedTaskExecutor; class ThreadExecutor; +class InvokeService; } namespace proton { @@ -38,6 +39,11 @@ public: * TODO: Make this a reference when it is always shared. */ virtual vespalib::ISequencedTaskExecutor* field_writer() = 0; + + /** + * Returns an InvokeService intended for regular wakeup calls. + */ + virtual vespalib::InvokeService & invokeService() = 0; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 30a23aaa3d5..0bcbbc14650 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -40,6 +40,7 @@ #include <vespa/vespalib/util/random.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/invokeserviceimpl.h> #ifdef __linux__ #include <malloc.h> #endif diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 91635dc7497..c18737f22b5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -29,7 +29,9 @@ #include <mutex> #include <shared_mutex> -namespace vespalib { class StateServer; } +namespace vespalib { + class StateServer; +} namespace search::transactionlog { class TransLogServerApp; } namespace metrics { class MetricLockGuard; } namespace storage::spi { struct PersistenceProvider; } @@ -61,6 +63,7 @@ private: using DocumentDBMap = std::map<DocTypeName, DocumentDB::SP>; using InitializeThreads = std::shared_ptr<vespalib::ThreadExecutor>; using BucketSpace = document::BucketSpace; + using InvokeService = vespalib::InvokeService; class ProtonFileHeaderContext : public search::common::FileHeaderContext { diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index c4fc79c43fd..fa4771bee1d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/invokeserviceimpl.h> VESPA_THREAD_STACK_TAG(proton_field_writer_executor) VESPA_THREAD_STACK_TAG(proton_shared_executor) @@ -18,7 +19,9 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi : _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor), _shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki, cfg.shared_task_limit(), proton_shared_executor)), - _field_writer() + _field_writer(), + _invokeService(5ms), + _invokeRegistrations() { const auto& fw_cfg = cfg.field_writer_config(); if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) { @@ -28,6 +31,11 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi fw_cfg.optimize(), fw_cfg.kindOfwatermark(), fw_cfg.reactionTime()); + if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) { + _invokeRegistrations.push_back(_invokeService.registerInvoke([executor = _field_writer.get()]() { + executor->wakeup(); + })); + } } } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h index 6006d484c97..cd0e6d71402 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h @@ -5,6 +5,7 @@ #include "shared_threading_service_config.h" #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/syncable.h> +#include <vespa/vespalib/util/invokeserviceimpl.h> #include <memory> namespace proton { @@ -14,9 +15,12 @@ namespace proton { */ class SharedThreadingService : public ISharedThreadingService { private: + using Registration = std::unique_ptr<vespalib::IDestructorCallback>; vespalib::ThreadStackExecutor _warmup; std::shared_ptr<vespalib::SyncableThreadExecutor> _shared; std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer; + vespalib::InvokeServiceImpl _invokeService; + std::vector<Registration> _invokeRegistrations; public: SharedThreadingService(const SharedThreadingServiceConfig& cfg); @@ -28,6 +32,7 @@ public: vespalib::ThreadExecutor& warmup() override { return _warmup; } vespalib::ThreadExecutor& shared() override { return *_shared; } vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); } + vespalib::InvokeService & invokeService() override { return _invokeService; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h index 976d75f2571..74965c15cd4 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h @@ -2,6 +2,7 @@ #pragma once #include <vespa/searchcore/proton/server/i_shared_threading_service.h> +#include <vespa/vespalib/util/invokeserviceimpl.h> namespace proton { @@ -9,16 +10,19 @@ class MockSharedThreadingService : public ISharedThreadingService { private: vespalib::ThreadExecutor& _warmup; vespalib::ThreadExecutor& _shared; + vespalib::InvokeServiceImpl _invokeService; public: MockSharedThreadingService(vespalib::ThreadExecutor& warmup_in, vespalib::ThreadExecutor& shared_in) : _warmup(warmup_in), - _shared(shared_in) + _shared(shared_in), + _invokeService(10ms) {} vespalib::ThreadExecutor& warmup() override { return _warmup; } vespalib::ThreadExecutor& shared() override { return _shared; } vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; } + vespalib::InvokeService & invokeService() override { return _invokeService; } }; } |