diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-06 15:06:43 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-06 15:06:43 +0000 |
commit | ba1dcd889e54af3c16c2122c000cca94ca03d22e (patch) | |
tree | b7e069e930d2812cba79e982f005198ffe7eb727 /searchcore | |
parent | b5882399caf70648f9dba597696d20019e574fea (diff) |
Add control for kind_of_watermark and reaction time.
Diffstat (limited to 'searchcore')
7 files changed, 53 insertions, 28 deletions
diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp index 14ad82a0aa1..89930a6b1a3 100644 --- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp +++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp @@ -79,7 +79,7 @@ struct ViewPtrs ~ViewPtrs(); }; -ViewPtrs::~ViewPtrs() {} +ViewPtrs::~ViewPtrs() = default; struct ViewSet { diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 9434fe7d7ff..f895bd0c88d 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -132,6 +132,14 @@ indexing.tasklimit int default=1000 restart ## is 40000 then effective task limit is 10000. indexing.semiunboundtasklimit int default = 40000 restart +## Kind of watermark for when to activate extra manpower +## Utilized if optimize is set to either THROUGHPUT or ADAPTIVE +indexing.kind_of_watermark int default = 0 restart + +## Controls minimum reaction time in seconds if using THROUGHPUT +indexing.reactiontime double default = 0.005 restart + + ## How long a freshly loaded index shall be warmed up ## before being used for serving index.warmup.time double default=0.0 restart diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 1ee1b703ea9..a88fe109326 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -137,11 +137,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, ThreadingServiceConfig::make(protonCfg, findDocumentDB(protonCfg.documentdb, docTypeName.getName())->feeding.concurrency, hwInfo.cpu())), - _writeService(sharedExecutor, - _writeServiceConfig.indexingThreads(), - indexing_thread_stack_size, - _writeServiceConfig.defaultTaskLimit(), - _writeServiceConfig.optimize()), + _writeService(sharedExecutor, _writeServiceConfig, indexing_thread_stack_size), _initializeThreads(std::move(initializeThreads)), _initConfigSnapshot(), _initConfigSerialNum(0u), diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index e3657eefdc8..b29dd955ff3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "executorthreadingservice.h" +#include "threading_service_config.h" #include <vespa/searchcore/proton/metrics/executor_threading_service_stats.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/singleexecutor.h> @@ -28,20 +29,24 @@ createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor } +ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads) + : ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads)) +{} + ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor & sharedExecutor, - uint32_t threads, uint32_t stackSize, uint32_t taskLimit, - OptimizeFor optimize) + const ThreadingServiceConfig & cfg, uint32_t stackSize) : _sharedExecutor(sharedExecutor), _masterExecutor(1, stackSize), - _indexExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)), - _summaryExecutor(createExecutorWithOneThread(stackSize, taskLimit, optimize)), + _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())), + _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())), _masterService(_masterExecutor), _indexService(*_indexExecutor), _summaryService(*_summaryExecutor), - _indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)), - _indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)), - _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit, optimize)) + _indexFieldInverter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())), + _indexFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())), + _attributeFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.optimize(), + cfg.kindOfwatermark(), cfg.reactionTime())) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index dc446a202ec..280e50aea56 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -8,6 +8,7 @@ namespace proton { class ExecutorThreadingServiceStats; +class ThreadingServiceConfig; /** * Implementation of IThreadingService using 2 underlying thread stack executors @@ -33,13 +34,11 @@ public: * Constructor. * * @stackSize The size of the stack of the underlying executors. - * @taskLimit The task limit for the index executor. + * @cfg config used to set up all executors. */ ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, - uint32_t threads = 1, - uint32_t stackSize = 128 * 1024, - uint32_t taskLimit = 1000, - OptimizeFor optimize = OptimizeFor::LATENCY); + const ThreadingServiceConfig & cfg, uint32_t stackSize = 128 * 1024); + ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads = 1); ~ExecutorThreadingService() override; /** diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index e95920ca606..22eb64acdee 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -9,14 +9,19 @@ namespace proton { using ProtonConfig = ThreadingServiceConfig::ProtonConfig; using OptimizeFor = vespalib::Executor::OptimizeFor; + ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, - OptimizeFor optimize) + OptimizeFor optimize_, + uint32_t kindOfWatermark_, + vespalib::duration reactionTime_) : _indexingThreads(indexingThreads_), _defaultTaskLimit(defaultTaskLimit_), _semiUnboundTaskLimit(semiUnboundTaskLimit_), - _optimize(optimize) + _optimize(optimize_), + _kindOfWatermark(kindOfWatermark_), + _reactionTime(reactionTime_) { } @@ -52,7 +57,14 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing.threads, concurrency, cpuInfo); return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit, (cfg.indexing.semiunboundtasklimit / indexingThreads), - selectOptimization(cfg.indexing.optimize)); + selectOptimization(cfg.indexing.optimize), + cfg.indexing.kindOfWatermark, + vespalib::from_s(cfg.indexing.reactiontime)); +} + +ThreadingServiceConfig +ThreadingServiceConfig::make(uint32_t indexingThreads) { + return ThreadingServiceConfig(indexingThreads, 100, 1000, OptimizeFor::LATENCY, 0, 10ms); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index 149215f97dc..d608b49f49e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -3,6 +3,7 @@ #include <vespa/searchcore/proton/common/hw_info.h> #include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/time.h> #include <cstdint> namespace vespa::config::search::core::internal { class InternalProtonType; } @@ -17,21 +18,25 @@ public: using OptimizeFor = vespalib::Executor::OptimizeFor; private: - uint32_t _indexingThreads; - uint32_t _defaultTaskLimit; - uint32_t _semiUnboundTaskLimit; - OptimizeFor _optimize; + uint32_t _indexingThreads; + uint32_t _defaultTaskLimit; + uint32_t _semiUnboundTaskLimit; + OptimizeFor _optimize; + uint32_t _kindOfWatermark; + vespalib::duration _reactionTime; // Minimum reaction time to new tasks private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, uint32_t semiUnboundTaskLimit_, OptimizeFor optimize, uint32_t kindOfWatermark, vespalib::duration reactionTime); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); - + static ThreadingServiceConfig make(uint32_t indexingThreads); uint32_t indexingThreads() const { return _indexingThreads; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } uint32_t semiUnboundTaskLimit() const { return _semiUnboundTaskLimit; } - OptimizeFor optimize() const { return _optimize;} + OptimizeFor optimize() const { return _optimize; } + uint32_t kindOfwatermark() const { return _kindOfWatermark; } + vespalib::duration reactionTime() const { return _reactionTime; } }; } |