diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-04 16:08:42 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-04 16:08:42 +0100 |
commit | 39da265d599c3b302dbce4f6d25103bc4d38418b (patch) | |
tree | 1b21d2fab9e081adbd378524dbf6d2259f410764 /searchcore | |
parent | 23a6aa13e9150a3858f85bf786a674dfcdcf3d89 (diff) | |
parent | 28098c7776ae90a88348c8d0d6daa0ac5e336a11 (diff) |
Merge pull request #19867 from vespa-engine/geirst/support-shared-field-writer-executor-for-documentdb
Add support for using a shared field writer executor inside the threa…
Diffstat (limited to 'searchcore')
8 files changed, 170 insertions, 29 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index f7d1b763117..36c7e386445 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -90,6 +90,7 @@ vespa_define_module( src/tests/proton/documentdb/documentbucketmover src/tests/proton/documentdb/documentdbconfig src/tests/proton/documentdb/documentdbconfigscout + src/tests/proton/documentdb/executor_threading_service src/tests/proton/documentdb/feedhandler src/tests/proton/documentdb/feedview src/tests/proton/documentdb/fileconfigmanager diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt b/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt new file mode 100644 index 00000000000..721f2207213 --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcore_executor_threading_service_test_app TEST + SOURCES + executor_threading_service_test.cpp + DEPENDS + searchcore_server + GTest::GTest +) +vespa_add_test(NAME searchcore_executor_threading_service_test_app COMMAND searchcore_executor_threading_service_test_app) diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp new file mode 100644 index 00000000000..714ffaa16b7 --- /dev/null +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -0,0 +1,79 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchcore/proton/server/executorthreadingservice.h> +#include <vespa/searchcore/proton/server/threading_service_config.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> + +using namespace proton; +using vespalib::ISequencedTaskExecutor; +using vespalib::SequencedTaskExecutor; +using SharedFieldWriterExecutor = ThreadingServiceConfig::SharedFieldWriterExecutor; + + +SequencedTaskExecutor* +to_concrete_type(ISequencedTaskExecutor& exec) +{ + return dynamic_cast<SequencedTaskExecutor*>(&exec); +} + +class ExecutorThreadingServiceTest : public ::testing::Test { +public: + vespalib::ThreadStackExecutor shared_executor; + std::unique_ptr<ExecutorThreadingService> service; + ExecutorThreadingServiceTest() + : shared_executor(1, 1000), + service() + { + } + void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) { + service = std::make_unique<ExecutorThreadingService>(shared_executor, + ThreadingServiceConfig::make(indexing_threads, shared_field_writer)); + } + SequencedTaskExecutor* index_inverter() { + return to_concrete_type(service->indexFieldInverter()); + } + SequencedTaskExecutor* index_writer() { + return to_concrete_type(service->indexFieldWriter()); + } + SequencedTaskExecutor* attribute_writer() { + return to_concrete_type(service->attributeFieldWriter()); + } +}; + +void +assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t exp_task_limit) +{ + EXPECT_EQ(exp_executors, exec->getNumExecutors()); + EXPECT_EQ(exp_task_limit, exec->first_executor()->getTaskLimit()); +} + +TEST_F(ExecutorThreadingServiceTest, no_shared_field_writer_executor) +{ + setup(4, SharedFieldWriterExecutor::NONE); + EXPECT_NE(index_inverter(), index_writer()); + EXPECT_NE(index_writer(), attribute_writer()); + assert_executor(index_inverter(), 4, 100); + assert_executor(index_writer(), 4, 100); + assert_executor(attribute_writer(), 4, 100); +} + +TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_field_writers) +{ + setup(4, SharedFieldWriterExecutor::INDEX); + EXPECT_EQ(index_inverter(), index_writer()); + EXPECT_NE(index_inverter(), attribute_writer()); + assert_executor(index_inverter(), 8, 100); + assert_executor(attribute_writer(), 4, 100); +} + +TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_field_writers) +{ + setup(4, SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE); + EXPECT_EQ(index_inverter(), index_writer()); + EXPECT_EQ(index_inverter(), attribute_writer()); + assert_executor(index_inverter(), 12, 100); +} + +GTEST_MAIN_RUN_ALL_TESTS() + diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index e1bdf13fd36..6b85f4a6829 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -493,6 +493,16 @@ hwinfo.cpu.cores int default = 0 restart ## Deprecated -> Use documentdb.feeding.concurrency feeding.concurrency double default = 0.2 restart +## Whether we should use a shared field writer executor in the document db threading service: +## +## NONE: Don't use a shared executor. +## INDEX: Use a shared executor for index field inverter and index field writer. +## INDEX_AND_ATTRIBUTE: Use a shared executor for index field inverter, index field writer, and attribute field writer. +## DOCUMENT_DB: Use a shared executor for index field inverter, index field writer, and attribute field writer among all document dbs. +## +## TODO: Remove this when a shared executor is the default. +feeding.shared_field_writer_executor enum {NONE, INDEX, INDEX_AND_ATTRIBUTE, DOCUMENT_DB} default = NONE restart + ## Adjustment to resource limit when determining if maintenance jobs can run. ## ## Currently used by 'lid_space_compaction' and 'move_buckets' jobs. diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index cb4b396d63d..d35aaf9f909 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -12,6 +12,7 @@ using vespalib::BlockingThreadStackExecutor; using vespalib::SingleExecutor; using vespalib::SequencedTaskExecutor; using OptimizeFor = vespalib::Executor::OptimizeFor; +using SharedFieldWriterExecutor = proton::ThreadingServiceConfig::SharedFieldWriterExecutor; namespace proton { @@ -30,9 +31,10 @@ createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor VESPA_THREAD_STACK_TAG(master_executor) VESPA_THREAD_STACK_TAG(index_executor) VESPA_THREAD_STACK_TAG(summary_executor) -VESPA_THREAD_STACK_TAG(field_inverter_executor) +VESPA_THREAD_STACK_TAG(index_field_inverter_executor) +VESPA_THREAD_STACK_TAG(index_field_writer_executor) +VESPA_THREAD_STACK_TAG(attribute_field_writer_executor) VESPA_THREAD_STACK_TAG(field_writer_executor) -VESPA_THREAD_STACK_TAG(attribute_executor) } @@ -50,11 +52,38 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor & sh _masterService(_masterExecutor), _indexService(*_indexExecutor), _summaryService(*_summaryExecutor), - _indexFieldInverter(SequencedTaskExecutor::create(field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit())), - _indexFieldWriter(SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit())), - _attributeFieldWriter(SequencedTaskExecutor::create(attribute_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime())) + _indexFieldInverter(), + _indexFieldWriter(), + _attributeFieldWriter(), + _field_writer(), + _index_field_inverter_ptr(), + _index_field_writer_ptr(), + _attribute_field_writer_ptr() { + if (cfg.shared_field_writer() == SharedFieldWriterExecutor::INDEX) { + _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); + _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), + cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime()); + _index_field_inverter_ptr = _field_writer.get(); + _index_field_writer_ptr = _field_writer.get(); + _attribute_field_writer_ptr = _attributeFieldWriter.get(); + + } else if (cfg.shared_field_writer() == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) { + _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), + cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime()); + _index_field_inverter_ptr = _field_writer.get(); + _index_field_writer_ptr = _field_writer.get(); + _attribute_field_writer_ptr = _field_writer.get(); + } else { + // TODO: Add support for shared field writer across all document dbs. + _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); + _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); + _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), + cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime()); + _index_field_inverter_ptr = _indexFieldInverter.get(); + _index_field_writer_ptr = _indexFieldWriter.get(); + _attribute_field_writer_ptr = _attributeFieldWriter.get(); + } } ExecutorThreadingService::~ExecutorThreadingService() = default; @@ -74,11 +103,11 @@ ExecutorThreadingService::syncOnce() { if (!isMasterThread) { _masterExecutor.sync(); } - _attributeFieldWriter->sync_all(); + _attribute_field_writer_ptr->sync_all(); _indexExecutor->sync(); _summaryExecutor->sync(); - _indexFieldInverter->sync_all(); - _indexFieldWriter->sync_all(); + _index_field_inverter_ptr->sync_all(); + _index_field_writer_ptr->sync_all(); if (!isMasterThread) { _masterExecutor.sync(); } @@ -89,13 +118,13 @@ ExecutorThreadingService::shutdown() { _masterExecutor.shutdown(); _masterExecutor.sync(); - _attributeFieldWriter->sync_all(); + _attribute_field_writer_ptr->sync_all(); _summaryExecutor->shutdown(); _summaryExecutor->sync(); _indexExecutor->shutdown(); _indexExecutor->sync(); - _indexFieldInverter->sync_all(); - _indexFieldWriter->sync_all(); + _index_field_inverter_ptr->sync_all(); + _index_field_writer_ptr->sync_all(); } void @@ -103,9 +132,9 @@ ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskL { _indexExecutor->setTaskLimit(taskLimit); _summaryExecutor->setTaskLimit(summaryTaskLimit); - _indexFieldInverter->setTaskLimit(taskLimit); - _indexFieldWriter->setTaskLimit(taskLimit); - _attributeFieldWriter->setTaskLimit(taskLimit); + _index_field_inverter_ptr->setTaskLimit(taskLimit); + _index_field_writer_ptr->setTaskLimit(taskLimit); + _attribute_field_writer_ptr->setTaskLimit(taskLimit); } ExecutorThreadingServiceStats @@ -115,24 +144,24 @@ ExecutorThreadingService::getStats() _indexExecutor->getStats(), _summaryExecutor->getStats(), _sharedExecutor.getStats(), - _indexFieldInverter->getStats(), - _indexFieldWriter->getStats(), - _attributeFieldWriter->getStats()); + _index_field_inverter_ptr->getStats(), + _index_field_writer_ptr->getStats(), + _attribute_field_writer_ptr->getStats()); } vespalib::ISequencedTaskExecutor & ExecutorThreadingService::indexFieldInverter() { - return *_indexFieldInverter; + return *_index_field_inverter_ptr; } vespalib::ISequencedTaskExecutor & ExecutorThreadingService::indexFieldWriter() { - return *_indexFieldWriter; + return *_index_field_writer_ptr; } vespalib::ISequencedTaskExecutor & ExecutorThreadingService::attributeFieldWriter() { - return *_attributeFieldWriter; + return *_attribute_field_writer_ptr; } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index ed34e518114..51da27586f7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -27,6 +27,10 @@ private: std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldInverter; std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldWriter; std::unique_ptr<vespalib::ISequencedTaskExecutor> _attributeFieldWriter; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer; + vespalib::ISequencedTaskExecutor* _index_field_inverter_ptr; + vespalib::ISequencedTaskExecutor* _index_field_writer_ptr; + vespalib::ISequencedTaskExecutor* _attribute_field_writer_ptr; void syncOnce(); public: diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index e2d3c4a2366..012d91cb49f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -14,12 +14,14 @@ ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, - vespalib::duration reactionTime_) + vespalib::duration reactionTime_, + SharedFieldWriterExecutor shared_field_writer_) : _indexingThreads(indexingThreads_), _defaultTaskLimit(defaultTaskLimit_), _optimize(optimize_), _kindOfWatermark(kindOfWatermark_), - _reactionTime(reactionTime_) + _reactionTime(reactionTime_), + _shared_field_writer(shared_field_writer_) { } @@ -60,12 +62,13 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit, selectOptimization(cfg.indexing.optimize), cfg.indexing.kindOfWatermark, - vespalib::from_s(cfg.indexing.reactiontime)); + vespalib::from_s(cfg.indexing.reactiontime), + cfg.feeding.sharedFieldWriterExecutor); } ThreadingServiceConfig -ThreadingServiceConfig::make(uint32_t indexingThreads) { - return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms); +ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) { + return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_); } void @@ -81,7 +84,8 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const _defaultTaskLimit == rhs._defaultTaskLimit && _optimize == rhs._optimize && _kindOfWatermark == rhs._kindOfWatermark && - _reactionTime == rhs._reactionTime; + _reactionTime == rhs._reactionTime && + _shared_field_writer == rhs._shared_field_writer; } } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index d945e9911fc..5869eaf9c2e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include <vespa/searchcore/config/config-proton.h> #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/time.h> @@ -16,6 +17,7 @@ class ThreadingServiceConfig { public: using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType; using OptimizeFor = vespalib::Executor::OptimizeFor; + using SharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor; private: uint32_t _indexingThreads; @@ -23,19 +25,22 @@ private: OptimizeFor _optimize; uint32_t _kindOfWatermark; vespalib::duration _reactionTime; // Maximum reaction time to new tasks + SharedFieldWriterExecutor _shared_field_writer; private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize, uint32_t kindOfWatermark, vespalib::duration reactionTime); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize_, + uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); - static ThreadingServiceConfig make(uint32_t indexingThreads); + static ThreadingServiceConfig make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_ = SharedFieldWriterExecutor::NONE); void update(const ThreadingServiceConfig& cfg); uint32_t indexingThreads() const { return _indexingThreads; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } OptimizeFor optimize() const { return _optimize; } uint32_t kindOfwatermark() const { return _kindOfWatermark; } vespalib::duration reactionTime() const { return _reactionTime; } + SharedFieldWriterExecutor shared_field_writer() const { return _shared_field_writer; } bool operator==(const ThreadingServiceConfig &rhs) const; }; |