diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-19 08:28:59 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-19 08:28:59 +0000 |
commit | 07d895ff30db9a10989e2d4a076e3015acccfa2a (patch) | |
tree | 9cc7fc1fbe928516543d15505b3544df59d89037 /staging_vespalib | |
parent | 240bd7f0ccf80ebc45fc953eded77404d11fb586 (diff) |
Let the Adaptive Executor have both soft and hard limit.
Diffstat (limited to 'staging_vespalib')
5 files changed, 21 insertions, 11 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp index 5fc6d2a69ae..da31f1c1a79 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp @@ -18,7 +18,7 @@ class Fixture public: AdaptiveSequencedExecutor _threads; - Fixture() : _threads(2, 2, 0, 1000) { } + Fixture(bool is_max_pending_hard=true) : _threads(2, 2, 0, 1000, is_max_pending_hard) { } }; @@ -231,12 +231,12 @@ TEST_F("require that executeLambda works", Fixture) } TEST("require that you get correct number of executors") { - AdaptiveSequencedExecutor seven(7, 1, 0, 10); + AdaptiveSequencedExecutor seven(7, 1, 0, 10, true); EXPECT_EQUAL(7u, seven.getNumExecutors()); } TEST("require that you distribute well") { - AdaptiveSequencedExecutor seven(7, 1, 0, 10); + AdaptiveSequencedExecutor seven(7, 1, 0, 10, true); EXPECT_EQUAL(7u, seven.getNumExecutors()); for (uint32_t id=0; id < 1000; id++) { EXPECT_EQUAL(id%7, seven.getExecutorId(id).getId()); diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index c609c538977..0f7c82ef988 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -55,7 +55,7 @@ int main(int argc, char **argv) { std::atomic<long> counter(0); std::unique_ptr<ISequencedTaskExecutor> executor; if (use_adaptive_executor) { - executor = std::make_unique<AdaptiveSequencedExecutor>(num_strands, num_threads, max_waiting, task_limit); + executor = std::make_unique<AdaptiveSequencedExecutor>(num_strands, num_threads, max_waiting, task_limit, true); } else { auto optimize = optimize_for_throughput ? vespalib::Executor::OptimizeFor::THROUGHPUT diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 4d08e14375c..1e23ba15785 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -95,7 +95,7 @@ AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock<std::mutex> &lock) while (_self.state == Self::State::BLOCKED) { _self.cond.wait(lock); } - while ((_self.state == Self::State::OPEN) && (_self.pending_tasks >= _cfg.max_pending)) { + while ((_self.state == Self::State::OPEN) && _cfg.is_above_max_pending(_self.pending_tasks)) { _self.state = Self::State::BLOCKED; while (_self.state == Self::State::BLOCKED) { _self.cond.wait(lock); @@ -228,7 +228,8 @@ AdaptiveSequencedExecutor::worker_main() } AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, - size_t max_waiting, size_t max_pending) + size_t max_waiting, size_t max_pending, + bool is_max_pending_hard) : ISequencedTaskExecutor(num_strands), _thread_tools(std::make_unique<ThreadTools>(*this)), _mutex(), @@ -238,7 +239,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t _self(), _stats(), _idleTracker(steady_clock::now()), - _cfg(num_threads, max_waiting, max_pending) + _cfg(num_threads, max_waiting, max_pending, is_max_pending_hard) { _stats.queueSize.add(_self.pending_tasks); _thread_tools->start(num_threads); diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index ccf6ab977f3..d6244564fbd 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -51,14 +51,22 @@ private: size_t max_waiting; size_t max_pending; size_t wakeup_limit; + bool is_max_pending_hard; void set_max_pending(size_t max_pending_in) { max_pending = std::max(1uL, max_pending_in); wakeup_limit = std::max(1uL, size_t(max_pending * 0.9)); assert(wakeup_limit > 0); assert(wakeup_limit <= max_pending); } - Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in) - : num_threads(num_threads_in), max_waiting(max_waiting_in), max_pending(1000), wakeup_limit(900) + bool is_above_max_pending(size_t pending) { + return (pending >= max_pending) && is_max_pending_hard; + } + Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in, bool is_max_pending_hard_in) + : num_threads(num_threads_in), + max_waiting(max_waiting_in), + max_pending(1000), + wakeup_limit(900), + is_max_pending_hard(is_max_pending_hard_in) { assert(num_threads > 0); set_max_pending(max_pending_in); @@ -143,7 +151,8 @@ private: void worker_main(); public: AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, - size_t max_waiting, size_t max_pending); + size_t max_waiting, size_t max_pending, + bool is_max_pending_hard); ~AdaptiveSequencedExecutor() override; ExecutorId getExecutorId(uint64_t component) const override; void executeTask(ExecutorId id, Task::UP task) override; diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 88a679b4cdb..59ffad88d09 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -61,7 +61,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3 { if (optimize == OptimizeFor::ADAPTIVE) { size_t num_strands = std::min(taskLimit, threads*32); - return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit); + return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit, is_task_limit_hard); } else { auto executors = std::vector<std::unique_ptr<SyncableThreadExecutor>>(); executors.reserve(threads); |