diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-18 22:01:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-18 22:01:06 +0100 |
commit | ba063a6ca30deee66590efe4ae6e4e9f6ac83b5c (patch) | |
tree | f8ffa9a2c5cb83fcda57a14d61dbf7829372c21a | |
parent | 04e894ae96dc13b44762f600dc6a4b3f97df13c7 (diff) | |
parent | aeb3ab8725ee058c65036d582155edc20eff654b (diff) |
Merge pull request #20857 from vespa-engine/balder/wire-in-control-of-unbound-q
Wire in control of whether taskLimit is hard.
12 files changed, 72 insertions, 26 deletions
diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp index a59b3e9bc6f..fc8bd474813 100644 --- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp @@ -14,11 +14,11 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; struct Fixture { ProtonConfig cfg; - Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500) + Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, int32_t task_limit = 500) : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit)) { } - ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) { + ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, int32_t task_limit) { ProtonConfigBuilder builder; builder.indexing.threads = baseLineIndexingThreads; builder.indexing.tasklimit = task_limit; @@ -56,6 +56,15 @@ TEST_F("require that task limits are set", Fixture) auto tcfg = f.make(24); EXPECT_EQUAL(2000u, tcfg.master_task_limit()); EXPECT_EQUAL(500u, tcfg.defaultTaskLimit()); + EXPECT_TRUE(tcfg.is_task_limit_hard()); +} + +TEST_F("require that negative task limit makes it soft", Fixture(2, 3000, -700)) +{ + auto tcfg = f.make(24); + EXPECT_EQUAL(3000u, tcfg.master_task_limit()); + EXPECT_EQUAL(700u, tcfg.defaultTaskLimit()); + EXPECT_FALSE(tcfg.is_task_limit_hard()); } namespace { diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 36c8070f140..47eaef2b6b5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -72,7 +72,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) { _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); } @@ -82,7 +82,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) { _field_writer = SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();})); } @@ -98,7 +98,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sha _indexFieldInverter = SequencedTaskExecutor::create(index_field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); _indexFieldWriter = SequencedTaskExecutor::create(index_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit()); _attributeFieldWriter = SequencedTaskExecutor::create(attribute_field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.optimize(), cfg.kindOfwatermark()); + cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index e32cd6f5f4e..5a3eaa02d3b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -28,6 +28,7 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi _field_writer = vespalib::SequencedTaskExecutor::create(proton_field_writer_executor, fw_cfg.indexingThreads() * 3, fw_cfg.defaultTaskLimit(), + fw_cfg.is_task_limit_hard(), fw_cfg.optimize(), fw_cfg.kindOfwatermark()); if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) { diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index 7982e8a8414..335d5bab8d0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -12,14 +12,15 @@ using OptimizeFor = vespalib::Executor::OptimizeFor; ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, - uint32_t defaultTaskLimit_, + int32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_) : _indexingThreads(indexingThreads_), _master_task_limit(master_task_limit_), - _defaultTaskLimit(defaultTaskLimit_), + _defaultTaskLimit(std::abs(defaultTaskLimit_)), + _is_task_limit_hard(defaultTaskLimit_ >= 0), _optimize(optimize_), _kindOfWatermark(kindOfWatermark_), _reactionTime(reactionTime_), @@ -81,6 +82,7 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const return _indexingThreads == rhs._indexingThreads && _master_task_limit == rhs._master_task_limit && _defaultTaskLimit == rhs._defaultTaskLimit && + _is_task_limit_hard == rhs._is_task_limit_hard && _optimize == rhs._optimize && _kindOfWatermark == rhs._kindOfWatermark && _reactionTime == rhs._reactionTime && diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index f1a4f0525d1..a54c0674263 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -23,15 +23,16 @@ private: uint32_t _indexingThreads; uint32_t _master_task_limit; uint32_t _defaultTaskLimit; + bool _is_task_limit_hard; OptimizeFor _optimize; uint32_t _kindOfWatermark; vespalib::duration _reactionTime; // Maximum reaction time to new tasks SharedFieldWriterExecutor _shared_field_writer; private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_, - OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, - SharedFieldWriterExecutor shared_field_writer_); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_, + OptimizeFor optimize_, uint32_t kindOfWatermark_, + vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); @@ -40,6 +41,7 @@ public: uint32_t indexingThreads() const { return _indexingThreads; } uint32_t master_task_limit() const { return _master_task_limit; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } + bool is_task_limit_hard() const { return _is_task_limit_hard; } OptimizeFor optimize() const { return _optimize; } uint32_t kindOfwatermark() const { return _kindOfWatermark; } vespalib::duration reactionTime() const { return _reactionTime; } diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index 3528cf74040..c609c538977 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -60,7 +60,7 @@ int main(int argc, char **argv) { auto optimize = optimize_for_throughput ? vespalib::Executor::OptimizeFor::THROUGHPUT : vespalib::Executor::OptimizeFor::LATENCY; - executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, optimize); + executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, true, optimize); } vespalib::Timer timer; for (size_t task_id = 0; task_id < num_tasks; ++task_id) { diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 243935d4013..705d6346e8c 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -2,7 +2,8 @@ #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/adaptive_sequenced_executor.h> - +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/singleexecutor.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/test/insertion_operators.h> @@ -22,7 +23,10 @@ class Fixture public: std::unique_ptr<ISequencedTaskExecutor> _threads; - Fixture() : _threads(SequencedTaskExecutor::create(sequenced_executor, 2)) { } + Fixture(bool is_task_limit_hard = true) : + _threads(SequencedTaskExecutor::create(sequenced_executor, 2, 1000, is_task_limit_hard, + Executor::OptimizeFor::LATENCY)) + { } }; @@ -258,6 +262,28 @@ TEST("require that you get correct number of executors") { EXPECT_EQUAL(7u, seven->getNumExecutors()); } +void verifyHardLimitForLatency(bool expect_hard) { + auto sequenced = SequencedTaskExecutor::create(sequenced_executor, 1, 100, expect_hard, Executor::OptimizeFor::LATENCY); + const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*sequenced); + EXPECT_EQUAL(expect_hard,nullptr != dynamic_cast<const BlockingThreadStackExecutor *>(seq.first_executor())); +} + +void verifyHardLimitForThroughput(bool expect_hard) { + auto sequenced = SequencedTaskExecutor::create(sequenced_executor, 1, 100, expect_hard, Executor::OptimizeFor::THROUGHPUT); + const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*sequenced); + const SingleExecutor * first = dynamic_cast<const SingleExecutor *>(seq.first_executor()); + EXPECT_TRUE(first != nullptr); + EXPECT_EQUAL(expect_hard, first->isBlocking()); +} + +TEST("require that you can get executor with both hard and soft limit") { + verifyHardLimitForLatency(true); + verifyHardLimitForLatency(false); + verifyHardLimitForThroughput(true); + verifyHardLimitForThroughput(false); +} + + TEST("require that you distribute well") { auto seven = SequencedTaskExecutor::create(sequenced_executor, 7); const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven); @@ -307,15 +333,15 @@ TEST("Test creation of different types") { auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::LATENCY); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::LATENCY); seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::THROUGHPUT); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::THROUGHPUT); seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::ADAPTIVE, 17); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::ADAPTIVE, 17); auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get()); ASSERT_TRUE(aseq != nullptr); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 58ae862f7c6..88a679b4cdb 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -3,6 +3,7 @@ #include "sequencedtaskexecutor.h" #include "adaptive_sequenced_executor.h" #include "singleexecutor.h" +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/blockingthreadstackexecutor.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/stllike/hashtable.h> @@ -46,17 +47,17 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads) { std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit) { - return create(func, threads, taskLimit, OptimizeFor::LATENCY); + return create(func, threads, taskLimit, true, OptimizeFor::LATENCY); } std::unique_ptr<ISequencedTaskExecutor> -SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) { - return create(func, threads, taskLimit, optimize, 0); +SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, bool is_task_limit_hard, OptimizeFor optimize) { + return create(func, threads, taskLimit, is_task_limit_hard, optimize, 0); } std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, - OptimizeFor optimize, uint32_t kindOfWatermark) + bool is_task_limit_hard, OptimizeFor optimize, uint32_t kindOfWatermark) { if (optimize == OptimizeFor::ADAPTIVE) { size_t num_strands = std::min(taskLimit, threads*32); @@ -67,9 +68,13 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3 for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark; - executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms)); + executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, is_task_limit_hard, watermark, 100ms)); } else { - executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); + if (is_task_limit_hard) { + executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); + } else { + executors.push_back(std::make_unique<ThreadStackExecutor>(1, stackSize, func)); + } } } return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 91304a6a2e3..a4b1b82aacf 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -35,10 +35,10 @@ public: static std::unique_ptr<ISequencedTaskExecutor> create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit); static std::unique_ptr<ISequencedTaskExecutor> - create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize); + create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, bool is_task_limit_hard, OptimizeFor optimize); static std::unique_ptr<ISequencedTaskExecutor> create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, - OptimizeFor optimize, uint32_t kindOfWatermark); + bool is_task_limit_hard, OptimizeFor optimize, uint32_t kindOfWatermark); /** * For testing only */ diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 4fdc217e701..dd755a76302 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -33,6 +33,7 @@ public: duration get_reaction_time() const { return _reactionTime; } ExecutorStats getStats() override; SingleExecutor & shutdown() override; + bool isBlocking() const { return !_overflow; } private: using Lock = std::unique_lock<std::mutex>; void drain(Lock & lock); diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index c163f6de024..c35be2789da 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -127,7 +127,7 @@ VESPA_THREAD_STACK_TAG(test_executor) void PersistenceTestUtils::setupExecutor(uint32_t numThreads) { - _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); + _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, true, vespalib::Executor::OptimizeFor::ADAPTIVE); } StorBucketDatabase::WrappedEntry diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 4a215c7a348..530d96dfe3f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -218,7 +218,7 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) _compReg, std::move(operation_throttler)); uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads); _sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000, - selectSequencer(_config->responseSequencerType)); + true, selectSequencer(_config->responseSequencerType)); assert(_sequencedExecutor); LOG(spam, "Setting up the disk"); for (uint32_t i = 0; i < numThreads; i++) { |