summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-14 11:54:53 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-01-18 14:38:23 +0000
commitaeb3ab8725ee058c65036d582155edc20eff654b (patch)
tree216f666d0a69da61a0c9053b806e00762319671c /searchcore
parent68da7d45fc5b6e6686963aed4432107a20c74b1f (diff)
Wire in control of whether taskLimit is hard.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/threading_service_config.h8
5 files changed, 24 insertions, 10 deletions
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
index a59b3e9bc6f..fc8bd474813 100644
--- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp
@@ -14,11 +14,11 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder;
struct Fixture {
ProtonConfig cfg;
- Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500)
+ Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, int32_t task_limit = 500)
: cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit))
{
}
- ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) {
+ ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, int32_t task_limit) {
ProtonConfigBuilder builder;
builder.indexing.threads = baseLineIndexingThreads;
builder.indexing.tasklimit = task_limit;
@@ -56,6 +56,15 @@ TEST_F("require that task limits are set", Fixture)
auto tcfg = f.make(24);
EXPECT_EQUAL(2000u, tcfg.master_task_limit());
EXPECT_EQUAL(500u, tcfg.defaultTaskLimit());
+ EXPECT_TRUE(tcfg.is_task_limit_hard());
+}
+
+TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700))
+{
+ auto tcfg = f.make(24);
+ EXPECT_EQUAL(3000u, tcfg.master_task_limit());
+ EXPECT_EQUAL(700u, tcfg.defaultTaskLimit());
+ EXPECT_FALSE(tcfg.is_task_limit_hard());
}
namespace {
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 36c8070f140..47eaef2b6b5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -72,7 +72,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) {
_field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
}
@@ -82,7 +82,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
} else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) {
_field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();}));
}
@@ -98,7 +98,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha
_indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit());
_attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
- cfg.optimize(), cfg.kindOfwatermark());
+ cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark());
if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) {
_invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
index e32cd6f5f4e..5a3eaa02d3b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp
@@ -28,6 +28,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi
_field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor,
fw_cfg.indexingThreads() * 3,
fw_cfg.defaultTaskLimit(),
+ fw_cfg.is_task_limit_hard(),
fw_cfg.optimize(),
fw_cfg.kindOfwatermark());
if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) {
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
index 7982e8a8414..335d5bab8d0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp
@@ -12,14 +12,15 @@ using OptimizeFor = vespalib::Executor::OptimizeFor;
ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_,
uint32_t master_task_limit_,
- uint32_t defaultTaskLimit_,
+ int32_t defaultTaskLimit_,
OptimizeFor optimize_,
uint32_t kindOfWatermark_,
vespalib::duration reactionTime_,
SharedFieldWriterExecutor shared_field_writer_)
: _indexingThreads(indexingThreads_),
_master_task_limit(master_task_limit_),
- _defaultTaskLimit(defaultTaskLimit_),
+ _defaultTaskLimit(std::abs(defaultTaskLimit_)),
+ _is_task_limit_hard(defaultTaskLimit_ >= 0),
_optimize(optimize_),
_kindOfWatermark(kindOfWatermark_),
_reactionTime(reactionTime_),
@@ -81,6 +82,7 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const
return _indexingThreads == rhs._indexingThreads &&
_master_task_limit == rhs._master_task_limit &&
_defaultTaskLimit == rhs._defaultTaskLimit &&
+ _is_task_limit_hard == rhs._is_task_limit_hard &&
_optimize == rhs._optimize &&
_kindOfWatermark == rhs._kindOfWatermark &&
_reactionTime == rhs._reactionTime &&
diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
index f1a4f0525d1..a54c0674263 100644
--- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
+++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h
@@ -23,15 +23,16 @@ private:
uint32_t _indexingThreads;
uint32_t _master_task_limit;
uint32_t _defaultTaskLimit;
+ bool _is_task_limit_hard;
OptimizeFor _optimize;
uint32_t _kindOfWatermark;
vespalib::duration _reactionTime; // Maximum reaction time to new tasks
SharedFieldWriterExecutor _shared_field_writer;
private:
- ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_,
- OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_,
- SharedFieldWriterExecutor shared_field_writer_);
+ ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_,
+ OptimizeFor optimize_, uint32_t kindOfWatermark_,
+ vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_);
public:
static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo);
@@ -40,6 +41,7 @@ public:
uint32_t indexingThreads() const { return _indexingThreads; }
uint32_t master_task_limit() const { return _master_task_limit; }
uint32_t defaultTaskLimit() const { return _defaultTaskLimit; }
+ bool is_task_limit_hard() const { return _is_task_limit_hard; }
OptimizeFor optimize() const { return _optimize; }
uint32_t kindOfwatermark() const { return _kindOfWatermark; }
vespalib::duration reactionTime() const { return _reactionTime; }