diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-14 11:54:53 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-18 14:38:23 +0000 |
commit | aeb3ab8725ee058c65036d582155edc20eff654b (patch) | |
tree | 216f666d0a69da61a0c9053b806e00762319671c /staging_vespalib | |
parent | 68da7d45fc5b6e6686963aed4432107a20c74b1f (diff) |
Wire in control of whether taskLimit is hard.
Diffstat (limited to 'staging_vespalib')
5 files changed, 46 insertions, 14 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index 3528cf74040..c609c538977 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -60,7 +60,7 @@ int main(int argc, char **argv) { auto optimize = optimize_for_throughput ? vespalib::Executor::OptimizeFor::THROUGHPUT : vespalib::Executor::OptimizeFor::LATENCY; - executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, optimize); + executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, true, optimize); } vespalib::Timer timer; for (size_t task_id = 0; task_id < num_tasks; ++task_id) { diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 243935d4013..705d6346e8c 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -2,7 +2,8 @@ #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/adaptive_sequenced_executor.h> - +#include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/util/singleexecutor.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/test/insertion_operators.h> @@ -22,7 +23,10 @@ class Fixture public: std::unique_ptr<ISequencedTaskExecutor> _threads; - Fixture() : _threads(SequencedTaskExecutor::create(sequenced_executor, 2)) { } + Fixture(bool is_task_limit_hard = true) : + _threads(SequencedTaskExecutor::create(sequenced_executor, 2, 1000, is_task_limit_hard, + Executor::OptimizeFor::LATENCY)) + { } }; @@ -258,6 +262,28 @@ TEST("require that you get correct number of executors") { EXPECT_EQUAL(7u, seven->getNumExecutors()); } +void verifyHardLimitForLatency(bool expect_hard) { + auto sequenced = SequencedTaskExecutor::create(sequenced_executor, 1, 100, expect_hard, Executor::OptimizeFor::LATENCY); + const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*sequenced); + EXPECT_EQUAL(expect_hard,nullptr != dynamic_cast<const BlockingThreadStackExecutor *>(seq.first_executor())); +} + +void verifyHardLimitForThroughput(bool expect_hard) { + auto sequenced = SequencedTaskExecutor::create(sequenced_executor, 1, 100, expect_hard, Executor::OptimizeFor::THROUGHPUT); + const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*sequenced); + const SingleExecutor * first = dynamic_cast<const SingleExecutor *>(seq.first_executor()); + EXPECT_TRUE(first != nullptr); + EXPECT_EQUAL(expect_hard, first->isBlocking()); +} + +TEST("require that you can get executor with both hard and soft limit") { + verifyHardLimitForLatency(true); + verifyHardLimitForLatency(false); + verifyHardLimitForThroughput(true); + verifyHardLimitForThroughput(false); +} + + TEST("require that you distribute well") { auto seven = SequencedTaskExecutor::create(sequenced_executor, 7); const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven); @@ -307,15 +333,15 @@ TEST("Test creation of different types") { auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::LATENCY); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::LATENCY); seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::THROUGHPUT); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::THROUGHPUT); seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::ADAPTIVE, 17); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::ADAPTIVE, 17); auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get()); ASSERT_TRUE(aseq != nullptr); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 58ae862f7c6..88a679b4cdb 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -3,6 +3,7 @@ #include "sequencedtaskexecutor.h" #include "adaptive_sequenced_executor.h" #include "singleexecutor.h" +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/blockingthreadstackexecutor.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/stllike/hashtable.h> @@ -46,17 +47,17 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads) { std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit) { - return create(func, threads, taskLimit, OptimizeFor::LATENCY); + return create(func, threads, taskLimit, true, OptimizeFor::LATENCY); } std::unique_ptr<ISequencedTaskExecutor> -SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) { - return create(func, threads, taskLimit, optimize, 0); +SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, bool is_task_limit_hard, OptimizeFor optimize) { + return create(func, threads, taskLimit, is_task_limit_hard, optimize, 0); } std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, - OptimizeFor optimize, uint32_t kindOfWatermark) + bool is_task_limit_hard, OptimizeFor optimize, uint32_t kindOfWatermark) { if (optimize == OptimizeFor::ADAPTIVE) { size_t num_strands = std::min(taskLimit, threads*32); @@ -67,9 +68,13 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3 for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark; - executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms)); + executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, is_task_limit_hard, watermark, 100ms)); } else { - executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); + if (is_task_limit_hard) { + executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); + } else { + executors.push_back(std::make_unique<ThreadStackExecutor>(1, stackSize, func)); + } } } return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 91304a6a2e3..a4b1b82aacf 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -35,10 +35,10 @@ public: static std::unique_ptr<ISequencedTaskExecutor> create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit); static std::unique_ptr<ISequencedTaskExecutor> - create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize); + create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, bool is_task_limit_hard, OptimizeFor optimize); static std::unique_ptr<ISequencedTaskExecutor> create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, - OptimizeFor optimize, uint32_t kindOfWatermark); + bool is_task_limit_hard, OptimizeFor optimize, uint32_t kindOfWatermark); /** * For testing only */ diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 4fdc217e701..dd755a76302 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -33,6 +33,7 @@ public: duration get_reaction_time() const { return _reactionTime; } ExecutorStats getStats() override; SingleExecutor & shutdown() override; + bool isBlocking() const { return !_overflow; } private: using Lock = std::unique_lock<std::mutex>; void drain(Lock & lock); |