diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-06 17:49:36 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-12-06 17:49:36 +0000 |
commit | 7f71eddb09917cf59091236c7f1b0701c038d5f1 (patch) | |
tree | 0b424617a3eca985e38a8df256c9f79474b9d61a /staging_vespalib | |
parent | 30827a4c8ed38be228f26b590a67eab6321ec76e (diff) |
Add testing of watermark and change it to have the ration to the taskLimit as
it had on initial construction time.
Diffstat (limited to 'staging_vespalib')
3 files changed, 15 insertions, 7 deletions
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index fae57b6532f..dd71380f64a 100644 --- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -37,7 +37,10 @@ void verifyResizeTaskLimit(bool up) { std::atomic<uint64_t> allowed(0); constexpr uint32_t INITIAL = 20; const uint32_t INITIAL_2inN = roundUp2inN(INITIAL); - SingleExecutor executor(sequenced_executor, INITIAL, INITIAL/2, 10ms); + double waterMarkRatio = 0.5; + SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms); + EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit()); + EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark()); uint32_t targetTaskLimit = up ? 40 : 5; uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit); @@ -56,6 +59,7 @@ void verifyResizeTaskLimit(bool up) { EXPECT_EQUAL(1u, started); executor.setTaskLimit(targetTaskLimit); EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit()); + EXPECT_EQUAL(INITIAL_2inN*waterMarkRatio, executor.get_watermark()); allowed = 5; while (started < 6); EXPECT_EQUAL(6u, started); @@ -74,8 +78,10 @@ void verifyResizeTaskLimit(bool up) { while (started < INITIAL + 1); EXPECT_EQUAL(INITIAL + 1, started); EXPECT_EQUAL(roundedTaskLimit, executor.getTaskLimit()); + EXPECT_EQUAL(roundedTaskLimit*waterMarkRatio, executor.get_watermark()); allowed = INITIAL + 1; } + TEST("test that resizing up and down works") { TEST_DO(verifyResizeTaskLimit(true)); TEST_DO(verifyResizeTaskLimit(false)); diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 2ebc64cf62a..ff46f58cb84 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -27,7 +27,7 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _wakeupConsumerAt(0), _producerNeedWakeupAt(0), _wp(0), - _watermark(std::min(_taskLimit.load(), watermark)), + _watermark(std::min(_taskLimit.load(), (watermark*_taskLimit)/taskLimit)), _reactionTime(reactionTime), _closed(false) { @@ -75,7 +75,7 @@ SingleExecutor::execute(Task::UP task) { void SingleExecutor::setTaskLimit(uint32_t taskLimit) { - _wantedTaskLimit = vespalib::roundUp2inN(std::max(taskLimit, _watermark)); + _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); } void @@ -117,7 +117,7 @@ SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); _producerCondition.notify_all(); - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed); Lock lock(_mutex); if (numTasks() <= 0) { steady_time now = steady_clock::now(); @@ -158,11 +158,13 @@ SingleExecutor::wait_for_room(Lock & lock) { if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) { drain(lock); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); + double waterMarkRatio = double(get_watermark()) / _taskLimit.load(std::memory_order_relaxed); _taskLimit = _wantedTaskLimit.load(); + _watermark = _taskLimit * waterMarkRatio; } _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, _reactionTime, wp - _watermark); + sleepProducer(lock, _reactionTime, wp - get_watermark()); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 7d868322558..9bc2f3ca837 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -28,7 +28,7 @@ public: void wakeup() override; size_t getNumThreads() const override; uint32_t getTaskLimit() const override { return _taskLimit.load(std::memory_order_relaxed); } - uint32_t get_watermark() const { return _watermark; } + uint32_t get_watermark() const { return _watermark.load(std::memory_order_relaxed); } duration get_reaction_time() const { return _reactionTime; } ExecutorStats getStats() override; SingleExecutor & shutdown() override; @@ -63,7 +63,7 @@ private: std::atomic<uint64_t> _wakeupConsumerAt; std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; - const uint32_t _watermark; + std::atomic<uint32_t> _watermark; const duration _reactionTime; bool _closed; }; |