diff options
author | Geir Storli <geirst@yahooinc.com> | 2021-11-29 12:56:46 +0000 |
---|---|---|
committer | Geir Storli <geirst@yahooinc.com> | 2021-11-29 12:56:46 +0000 |
commit | 18ba82682f0a9588f92f911ea4774d44d484f589 (patch) | |
tree | 5b86852de461dc3fecfec0ab378d8db6e1152c8d | |
parent | 3c4983f0109da80fefc643e43d12798ccf38fb80 (diff) |
Add support for using a shared field writer executor among all document dbs.
This is currently controlled with a feature flag setting a proton config to turn it on.
20 files changed, 192 insertions, 72 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index 32707f8a69f..4629ebec854 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -10,6 +10,7 @@ using vespalib::ISequencedTaskExecutor; using vespalib::SequencedTaskExecutor; using SharedFieldWriterExecutor = ThreadingServiceConfig::SharedFieldWriterExecutor; +VESPA_THREAD_STACK_TAG(my_field_writer_executor) SequencedTaskExecutor* to_concrete_type(ISequencedTaskExecutor& exec) @@ -20,14 +21,17 @@ to_concrete_type(ISequencedTaskExecutor& exec) class ExecutorThreadingServiceTest : public ::testing::Test { public: vespalib::ThreadStackExecutor shared_executor; + std::unique_ptr<ISequencedTaskExecutor> field_writer_executor; std::unique_ptr<ExecutorThreadingService> service; ExecutorThreadingServiceTest() : shared_executor(1, 1000), + field_writer_executor(SequencedTaskExecutor::create(my_field_writer_executor, 3, 200)), service() { } void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) { service = std::make_unique<ExecutorThreadingService>(shared_executor, + field_writer_executor.get(), ThreadingServiceConfig::make(indexing_threads, shared_field_writer)); } SequencedTaskExecutor* index_inverter() { @@ -39,6 +43,9 @@ public: SequencedTaskExecutor* attribute_writer() { return to_concrete_type(service->attributeFieldWriter()); } + SequencedTaskExecutor* field_writer() { + return to_concrete_type(*field_writer_executor); + } }; void @@ -75,6 +82,15 @@ TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_fie assert_executor(index_inverter(), 12, 100); } +TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outside) +{ + setup(4, SharedFieldWriterExecutor::DOCUMENT_DB); + EXPECT_EQ(field_writer(), index_inverter()); + EXPECT_EQ(field_writer(), index_writer()); + EXPECT_EQ(field_writer(), attribute_writer()); + assert_executor(field_writer(), 3, 200); +} + TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated) { setup(4, SharedFieldWriterExecutor::NONE); diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp index faf64af17e1..e90bfc8ae57 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp +++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp @@ -1,10 +1,15 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/searchcore/config/config-proton.h> +#include <vespa/searchcore/proton/server/shared_threading_service.h> #include <vespa/searchcore/proton/server/shared_threading_service_config.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/gtest/gtest.h> using namespace proton; +using vespalib::ISequencedTaskExecutor; +using vespalib::SequencedTaskExecutor; using ProtonConfig = vespa::config::search::core::ProtonConfig; using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; @@ -18,6 +23,8 @@ make_proton_config(double concurrency) builder.flush.maxconcurrent = 1; builder.feeding.concurrency = concurrency; + builder.feeding.sharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor::DOCUMENT_DB; + builder.indexing.tasklimit = 300; return builder; } @@ -38,4 +45,35 @@ TEST(SharedThreadingServiceConfigTest, shared_threads_are_derived_from_cpu_cores expect_shared_threads(5, 10); } +class SharedThreadingServiceTest : public ::testing::Test { +public: + std::unique_ptr<SharedThreadingService> service; + SharedThreadingServiceTest() + : service() + { + } + void setup(double concurrency, uint32_t cpu_cores) { + service = std::make_unique<SharedThreadingService>( + SharedThreadingServiceConfig::make(make_proton_config(concurrency), HwInfo::Cpu(cpu_cores))); + } + SequencedTaskExecutor* field_writer() { + return dynamic_cast<SequencedTaskExecutor*>(service->field_writer()); + } +}; + +void +assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t exp_task_limit) +{ + EXPECT_EQ(exp_executors, exec->getNumExecutors()); + EXPECT_EQ(exp_task_limit, exec->first_executor()->getTaskLimit()); +} + +TEST_F(SharedThreadingServiceTest, field_writer_can_be_shared_across_all_document_dbs) +{ + setup(0.75, 8); + EXPECT_TRUE(field_writer()); + EXPECT_EQ(6, field_writer()->getNumExecutors()); + EXPECT_EQ(300, field_writer()->first_executor()->getTaskLimit()); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp index 0db3488dc28..d8c42795099 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.cpp @@ -11,7 +11,8 @@ ContentProtonMetrics::ProtonExecutorMetrics::ProtonExecutorMetrics(metrics::Metr match("match", this), docsum("docsum", this), shared("shared", this), - warmup("warmup", this) + warmup("warmup", this), + field_writer("field_writer", this) { } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h index 26629d13569..70d3d16cb7c 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/content_proton_metrics.h @@ -26,6 +26,7 @@ struct ContentProtonMetrics : metrics::MetricSet ExecutorMetrics docsum; ExecutorMetrics shared; ExecutorMetrics warmup; + ExecutorMetrics field_writer; ProtonExecutorMetrics(metrics::MetricSet *parent); ~ProtonExecutorMetrics(); diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 5d7f841a909..2b2f2422221 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -175,7 +175,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. _writeServiceConfig(configSnapshot->get_threading_service_config()), - _writeService(shared_service.shared(), _writeServiceConfig, indexing_thread_stack_size), + _writeService(shared_service.shared(), shared_service.field_writer(), _writeServiceConfig, indexing_thread_stack_size), _initializeThreads(std::move(initializeThreads)), _initConfigSnapshot(), _initConfigSerialNum(0u), diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp index c044b544675..8b34775b65d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.cpp @@ -2,17 +2,21 @@ #include "executor_explorer_utils.h" #include <vespa/vespalib/data/slime/cursor.h> +#include <vespa/vespalib/util/adaptive_sequenced_executor.h> #include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/singleexecutor.h> #include <vespa/vespalib/util/threadstackexecutor.h> +using vespalib::AdaptiveSequencedExecutor; using vespalib::BlockingThreadStackExecutor; +using vespalib::ISequencedTaskExecutor; +using vespalib::SequencedTaskExecutor; using vespalib::SingleExecutor; using vespalib::ThreadExecutor; using vespalib::ThreadStackExecutor; using vespalib::slime::Cursor; - namespace proton::explorer { namespace { @@ -33,6 +37,32 @@ convert_single_executor_to_slime(const SingleExecutor& executor, Cursor& object) object.setDouble("reaction_time_sec", vespalib::to_s(executor.get_reaction_time())); } +void +set_type(Cursor& object, const vespalib::string& type) +{ + object.setString("type", type); +} + +void +convert_sequenced_executor_to_slime(const SequencedTaskExecutor& executor, Cursor& object) +{ + set_type(object, "SequencedTaskExecutor"); + object.setLong("num_executors", executor.getNumExecutors()); + convert_executor_to_slime(executor.first_executor(), object.setObject("executor")); +} + +void +convert_adaptive_executor_to_slime(const AdaptiveSequencedExecutor& executor, Cursor& object) +{ + set_type(object, "AdaptiveSequencedExecutor"); + object.setLong("num_strands", executor.getNumExecutors()); + auto cfg = executor.get_config(); + object.setLong("num_threads", cfg.num_threads); + object.setLong("max_waiting", cfg.max_waiting); + object.setLong("max_pending", cfg.max_pending); + object.setLong("wakeup_limit", cfg.wakeup_limit); +} + } void @@ -52,5 +82,18 @@ convert_executor_to_slime(const ThreadExecutor* executor, Cursor& object) } } +void +convert_executor_to_slime(const ISequencedTaskExecutor* executor, Cursor& object) +{ + if (const auto* seq = dynamic_cast<const SequencedTaskExecutor*>(executor)) { + convert_sequenced_executor_to_slime(*seq, object); + } else if (const auto* ada = dynamic_cast<const AdaptiveSequencedExecutor*>(executor)) { + convert_adaptive_executor_to_slime(*ada, object); + } else { + set_type(object, "ISequencedTaskExecutor"); + object.setLong("num_executors", executor->getNumExecutors()); + } +} + } diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h b/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h index 7ed0d073ded..aa7bfabd00b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_explorer_utils.h @@ -2,15 +2,23 @@ #pragma once -namespace vespalib { class ThreadExecutor; } +namespace vespalib { +class ISequencedTaskExecutor; +class ThreadExecutor; +} namespace vespalib::slime { struct Cursor; } namespace proton::explorer { /** - * Utility to convert an executor to slime for use with a state explorer. + * Utility to convert a thread executor to slime for use with a state explorer. */ void convert_executor_to_slime(const vespalib::ThreadExecutor* executor, vespalib::slime::Cursor& object); +/** + * Utility to convert a sequenced task executor to slime for use with a state explorer. + */ +void convert_executor_to_slime(const vespalib::ISequencedTaskExecutor* executor, vespalib::slime::Cursor& object); + } diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp index daf54eac859..5bbbf1ca57d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp @@ -4,61 +4,11 @@ #include "executor_threading_service_explorer.h" #include "executorthreadingservice.h" #include <vespa/vespalib/data/slime/cursor.h> -#include <vespa/vespalib/util/adaptive_sequenced_executor.h> -#include <vespa/vespalib/util/sequencedtaskexecutor.h> - -using vespalib::AdaptiveSequencedExecutor; -using vespalib::ISequencedTaskExecutor; -using vespalib::SequencedTaskExecutor; -using vespalib::slime::Cursor; namespace proton { using explorer::convert_executor_to_slime; -namespace { - -void -set_type(Cursor& object, const vespalib::string& type) -{ - object.setString("type", type); -} - -void -convert_sequenced_executor_to_slime(const SequencedTaskExecutor& executor, Cursor& object) -{ - set_type(object, "SequencedTaskExecutor"); - object.setLong("num_executors", executor.getNumExecutors()); - convert_executor_to_slime(executor.first_executor(), object.setObject("executor")); -} - -void -convert_adaptive_executor_to_slime(const AdaptiveSequencedExecutor& executor, Cursor& object) -{ - set_type(object, "AdaptiveSequencedExecutor"); - object.setLong("num_strands", executor.getNumExecutors()); - auto cfg = executor.get_config(); - object.setLong("num_threads", cfg.num_threads); - object.setLong("max_waiting", cfg.max_waiting); - object.setLong("max_pending", cfg.max_pending); - object.setLong("wakeup_limit", cfg.wakeup_limit); -} - -void -convert_executor_to_slime(const ISequencedTaskExecutor* executor, Cursor& object) -{ - if (const auto* seq = dynamic_cast<const SequencedTaskExecutor*>(executor)) { - convert_sequenced_executor_to_slime(*seq, object); - } else if (const auto* ada = dynamic_cast<const AdaptiveSequencedExecutor*>(executor)) { - convert_adaptive_executor_to_slime(*ada, object); - } else { - set_type(object, "ISequencedTaskExecutor"); - object.setLong("num_executors", executor->getNumExecutors()); - } -} - -} - ExecutorThreadingServiceExplorer::ExecutorThreadingServiceExplorer(searchcorespi::index::IThreadingService& service) : _service(service) { diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h index 70ed23c6271..f0bb20ab64e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.h @@ -14,10 +14,10 @@ class ExecutorThreadingService; */ class ExecutorThreadingServiceExplorer : public vespalib::StateExplorer { private: - searchcorespi::index::IThreadingService & _service; + searchcorespi::index::IThreadingService& _service; public: - ExecutorThreadingServiceExplorer(searchcorespi::index::IThreadingService & service); + ExecutorThreadingServiceExplorer(searchcorespi::index::IThreadingService& service); ~ExecutorThreadingServiceExplorer(); void get_state(const vespalib::slime::Inserter& inserter, bool full) const override; diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 86530f235fd..52da92ed568 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -39,10 +39,11 @@ VESPA_THREAD_STACK_TAG(field_writer_executor) } ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads) - : ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads)) + : ExecutorThreadingService(sharedExecutor, nullptr, ThreadingServiceConfig::make(num_treads)) {} ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, + vespalib::ISequencedTaskExecutor* field_writer, const ThreadingServiceConfig& cfg, uint32_t stackSize) @@ -76,8 +77,12 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _index_field_inverter_ptr = _field_writer.get(); _index_field_writer_ptr = _field_writer.get(); _attribute_field_writer_ptr = _field_writer.get(); + } else if (_shared_field_writer == SharedFieldWriterExecutor::DOCUMENT_DB) { + assert(field_writer != nullptr); + _index_field_inverter_ptr = field_writer; + _index_field_writer_ptr = field_writer; + _attribute_field_writer_ptr = field_writer; } else { - // TODO: Add support for shared field writer across all document dbs. _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), @@ -143,6 +148,7 @@ ExecutorThreadingService::set_task_limits(uint32_t master_task_limit, _master_task_limit.store(master_task_limit, std::memory_order_release); _indexExecutor->setTaskLimit(field_task_limit); _summaryExecutor->setTaskLimit(summary_task_limit); + // TODO: Move this to a common place when the field writer is always shared. _index_field_inverter_ptr->setTaskLimit(field_task_limit); _index_field_writer_ptr->setTaskLimit(field_task_limit); _attribute_field_writer_ptr->setTaskLimit(field_task_limit); @@ -166,6 +172,11 @@ ExecutorThreadingService::getStats() field_writer_stats, field_writer_stats, field_writer_stats); + } else if (_shared_field_writer == SharedFieldWriterExecutor::DOCUMENT_DB) { + vespalib::ExecutorStats empty_stats; + // In this case the field writer stats are reported at a higher level. + return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats, + empty_stats, empty_stats, empty_stats); } else { return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats, _index_field_inverter_ptr->getStats(), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 2a4c57ef57d..972e0de0ec0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -42,6 +42,7 @@ public: ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, uint32_t num_treads = 1); ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, + vespalib::ISequencedTaskExecutor* field_writer, const ThreadingServiceConfig& cfg, uint32_t stackSize = 128 * 1024); ~ExecutorThreadingService() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h index 5145dbec43e..9cd19012223 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h @@ -1,7 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -namespace vespalib { class ThreadExecutor; } +namespace vespalib { +class ISequencedTaskExecutor; +class ThreadExecutor; +} namespace proton { @@ -27,6 +30,14 @@ public: * - Writing of data in the document store. */ virtual vespalib::ThreadExecutor& shared() = 0; + + /** + * Returns the sequenced executor used to write index and attribute fields in a document db. + * + * This is a nullptr if the field writer is not shared across all document dbs. + * TODO: Make this a reference when it is always shared. + */ + virtual vespalib::ISequencedTaskExecutor* field_writer() = 0; }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index e056325e0d3..89940fbf367 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -38,6 +38,7 @@ #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/mmap_file_allocator_factory.h> #include <vespa/vespalib/util/random.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> #ifdef __linux__ #include <malloc.h> @@ -448,6 +449,9 @@ Proton::~Proton() if (_shared_service) { _shared_service->warmup_raw().sync(); _shared_service->shared_raw()->sync(); + if (_shared_service->field_writer()) { + _shared_service->field_writer()->sync_all(); + } } if ( ! _documentDBMap.empty()) { @@ -788,6 +792,9 @@ Proton::updateMetrics(const metrics::MetricLockGuard &) if (_shared_service) { metrics.shared.update(_shared_service->shared().getStats()); metrics.warmup.update(_shared_service->warmup().getStats()); + if (_shared_service->field_writer()) { + metrics.warmup.update(_shared_service->field_writer()->getStats()); + } } } } @@ -944,7 +951,8 @@ Proton::get_child(vespalib::stringref name) const (_summaryEngine) ? &_summaryEngine->get_executor() : nullptr, (_flushEngine) ? &_flushEngine->get_executor() : nullptr, &_executor, - (_shared_service) ? &_shared_service->warmup() : nullptr); + (_shared_service) ? &_shared_service->warmup() : nullptr, + (_shared_service) ? _shared_service->field_writer() : nullptr); } return Explorer_UP(nullptr); } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp index 70195980376..06a51b7e661 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.cpp @@ -16,13 +16,15 @@ ProtonThreadPoolsExplorer::ProtonThreadPoolsExplorer(const ThreadExecutor* share const ThreadExecutor* docsum, const ThreadExecutor* flush, const ThreadExecutor* proton, - const ThreadExecutor* warmup) + const ThreadExecutor* warmup, + const vespalib::ISequencedTaskExecutor* field_writer) : _shared(shared), _match(match), _docsum(docsum), _flush(flush), _proton(proton), - _warmup(warmup) + _warmup(warmup), + _field_writer(field_writer) { } @@ -37,6 +39,7 @@ ProtonThreadPoolsExplorer::get_state(const vespalib::slime::Inserter& inserter, convert_executor_to_slime(_flush, object.setObject("flush")); convert_executor_to_slime(_proton, object.setObject("proton")); convert_executor_to_slime(_warmup, object.setObject("warmup")); + convert_executor_to_slime(_field_writer, object.setObject("field_writer")); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h b/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h index 76c5fa8cfb0..2cacdd2c336 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_thread_pools_explorer.h @@ -4,7 +4,10 @@ #include <vespa/vespalib/net/state_explorer.h> -namespace vespalib { class ThreadExecutor; } +namespace vespalib { +class ISequencedTaskExecutor; +class ThreadExecutor; +} namespace proton { @@ -19,6 +22,7 @@ private: const vespalib::ThreadExecutor* _flush; const vespalib::ThreadExecutor* _proton; const vespalib::ThreadExecutor* _warmup; + const vespalib::ISequencedTaskExecutor* _field_writer; public: ProtonThreadPoolsExplorer(const vespalib::ThreadExecutor* shared, @@ -26,7 +30,8 @@ public: const vespalib::ThreadExecutor* docsum, const vespalib::ThreadExecutor* flush, const vespalib::ThreadExecutor* proton, - const vespalib::ThreadExecutor* warmup); + const vespalib::ThreadExecutor* warmup, + const vespalib::ISequencedTaskExecutor* field_writer); void get_state(const vespalib::slime::Inserter& inserter, bool full) const override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index 04e775674b4..f9333aa18a5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -1,19 +1,34 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "shared_threading_service.h" -#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> +#include <vespa/vespalib/util/size_literals.h> +VESPA_THREAD_STACK_TAG(proton_field_writer_executor) VESPA_THREAD_STACK_TAG(proton_shared_executor) VESPA_THREAD_STACK_TAG(proton_warmup_executor) namespace proton { +using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor; + SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg) : _warmup(cfg.warmup_threads(), 128_Ki, proton_warmup_executor), _shared(std::make_shared<vespalib::BlockingThreadStackExecutor>(cfg.shared_threads(), 128_Ki, - cfg.shared_task_limit(), proton_shared_executor)) + cfg.shared_task_limit(), proton_shared_executor)), + _field_writer() { + const auto& fw_cfg = cfg.field_writer_config(); + if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) { + _field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor, + fw_cfg.indexingThreads() * 3, + fw_cfg.defaultTaskLimit(), + fw_cfg.optimize(), + fw_cfg.kindOfwatermark(), + fw_cfg.reactionTime()); + } } } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h index ef0ff31c389..12686e4a3c5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h @@ -16,6 +16,7 @@ class SharedThreadingService : public ISharedThreadingService { private: vespalib::ThreadStackExecutor _warmup; std::shared_ptr<vespalib::SyncableThreadExecutor> _shared; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer; public: SharedThreadingService(const SharedThreadingServiceConfig& cfg); @@ -25,6 +26,7 @@ public: vespalib::ThreadExecutor& warmup() override { return _warmup; } vespalib::ThreadExecutor& shared() override { return *_shared; } + vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); } }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp index cf62cf3b76c..8a81c3f4388 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.cpp @@ -10,10 +10,12 @@ using ProtonConfig = SharedThreadingServiceConfig::ProtonConfig; SharedThreadingServiceConfig::SharedThreadingServiceConfig(uint32_t shared_threads_in, uint32_t shared_task_limit_in, - uint32_t warmup_threads_in) + uint32_t warmup_threads_in, + const ThreadingServiceConfig& field_writer_config_in) : _shared_threads(shared_threads_in), _shared_task_limit(shared_task_limit_in), - _warmup_threads(warmup_threads_in) + _warmup_threads(warmup_threads_in), + _field_writer_config(field_writer_config_in) { } @@ -35,7 +37,8 @@ SharedThreadingServiceConfig::make(const proton::SharedThreadingServiceConfig::P const proton::HwInfo::Cpu& cpu_info) { size_t shared_threads = derive_shared_threads(cfg, cpu_info); - return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, 4); + return proton::SharedThreadingServiceConfig(shared_threads, shared_threads * 16, 4, + ThreadingServiceConfig::make(cfg, cfg.feeding.concurrency, cpu_info)); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h index 02966e0efeb..1214bfa77fa 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service_config.h @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "threading_service_config.h" #include <vespa/searchcore/proton/common/hw_info.h> namespace vespa::config::search::core::internal { class InternalProtonType; } @@ -18,18 +19,20 @@ private: uint32_t _shared_threads; uint32_t _shared_task_limit; uint32_t _warmup_threads; + ThreadingServiceConfig _field_writer_config; public: SharedThreadingServiceConfig(uint32_t shared_threads_in, uint32_t shared_task_limit_in, - uint32_t warmup_threads_in); + uint32_t warmup_threads_in, + const ThreadingServiceConfig& field_writer_config_in); static SharedThreadingServiceConfig make(const ProtonConfig& cfg, const HwInfo::Cpu& cpu_info); uint32_t shared_threads() const { return _shared_threads; } uint32_t shared_task_limit() const { return _shared_task_limit; } uint32_t warmup_threads() const { return _warmup_threads; } - + const ThreadingServiceConfig& field_writer_config() const { return _field_writer_config; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h index f21f43ed5ad..976d75f2571 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h @@ -18,6 +18,7 @@ public: {} vespalib::ThreadExecutor& warmup() override { return _warmup; } vespalib::ThreadExecutor& shared() override { return _shared; } + vespalib::ISequencedTaskExecutor* field_writer() override { return nullptr; } }; } |