diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-14 11:54:53 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-18 14:38:23 +0000 |
commit | aeb3ab8725ee058c65036d582155edc20eff654b (patch) | |
tree | 216f666d0a69da61a0c9053b806e00762319671c /searchcore | |
parent | 68da7d45fc5b6e6686963aed4432107a20c74b1f (diff) |
Wire in control of whether taskLimit is hard.
Diffstat (limited to 'searchcore')
5 files changed, 24 insertions, 10 deletions
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp index a59b3e9bc6f..fc8bd474813 100644 --- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp @@ -14,11 +14,11 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; struct Fixture { ProtonConfig cfg; - Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500) + Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, int32_t task_limit = 500) : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit)) { } - ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) { + ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, int32_t task_limit) { ProtonConfigBuilder builder; builder.indexing.threads = baseLineIndexingThreads; builder.indexing.tasklimit = task_limit; @@ -56,6 +56,15 @@ TEST_F("require that task limits are set", Fixture) auto tcfg = f.make(24); EXPECT_EQUAL(2000u, tcfg.master_task_limit()); EXPECT_EQUAL(500u, tcfg.defaultTaskLimit()); + EXPECT_TRUE(tcfg.is_task_limit_hard()); +} + +TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700)) +{ + auto tcfg = f.make(24); + EXPECT_EQUAL(3000u, tcfg.master_task_limit()); + EXPECT_EQUAL(700u, tcfg.defaultTaskLimit()); + EXPECT_FALSE(tcfg.is_task_limit_hard()); } namespace { diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 36c8070f140..47eaef2b6b5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -72,7 +72,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) { _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); } @@ -82,7 +82,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) { _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();})); } @@ -98,7 +98,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index e32cd6f5f4e..5a3eaa02d3b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -28,6 +28,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi _field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor, fw_cfg.indexingThreads() * 3, fw_cfg.defaultTaskLimit(), + fw_cfg.is_task_limit_hard(), fw_cfg.optimize(), fw_cfg.kindOfwatermark()); if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) { diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index 7982e8a8414..335d5bab8d0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -12,14 +12,15 @@ using OptimizeFor = vespalib::Executor::OptimizeFor; ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, - uint32_t defaultTaskLimit_, + int32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_) : _indexingThreads(indexingThreads_), _master_task_limit(master_task_limit_), - _defaultTaskLimit(defaultTaskLimit_), + _defaultTaskLimit(std::abs(defaultTaskLimit_)), + _is_task_limit_hard(defaultTaskLimit_ >= 0), _optimize(optimize_), _kindOfWatermark(kindOfWatermark_), _reactionTime(reactionTime_), @@ -81,6 +82,7 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const return _indexingThreads == rhs._indexingThreads && _master_task_limit == rhs._master_task_limit && _defaultTaskLimit == rhs._defaultTaskLimit && + _is_task_limit_hard == rhs._is_task_limit_hard && _optimize == rhs._optimize && _kindOfWatermark == rhs._kindOfWatermark && _reactionTime == rhs._reactionTime && diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index f1a4f0525d1..a54c0674263 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -23,15 +23,16 @@ private: uint32_t _indexingThreads; uint32_t _master_task_limit; uint32_t _defaultTaskLimit; + bool _is_task_limit_hard; OptimizeFor _optimize; uint32_t _kindOfWatermark; vespalib::duration _reactionTime; // Maximum reaction time to new tasks SharedFieldWriterExecutor _shared_field_writer; private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_, - OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, - SharedFieldWriterExecutor shared_field_writer_); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_, + OptimizeFor optimize_, uint32_t kindOfWatermark_, + vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); @@ -40,6 +41,7 @@ public: uint32_t indexingThreads() const { return _indexingThreads; } uint32_t master_task_limit() const { return _master_task_limit; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } + bool is_task_limit_hard() const { return _is_task_limit_hard; } OptimizeFor optimize() const { return _optimize; } uint32_t kindOfwatermark() const { return _kindOfWatermark; } vespalib::duration reactionTime() const { return _reactionTime; } |