From ba1dcd889e54af3c16c2122c000cca94ca03d22e Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 6 Apr 2020 15:06:43 +0000 Subject: Add control for kind_of_watermark and reaction time. --- .../sequencedtaskexecutor_test.cpp | 22 ++++++++++++++++++++++ .../vespa/vespalib/util/sequencedtaskexecutor.cpp | 7 ++++--- .../vespa/vespalib/util/sequencedtaskexecutor.h | 4 +++- .../src/vespa/vespalib/util/singleexecutor.cpp | 16 ++++++++++++---- .../src/vespa/vespalib/util/singleexecutor.h | 3 +++ 5 files changed, 44 insertions(+), 8 deletions(-) (limited to 'staging_vespalib/src') diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index f5f04738e92..70d0f1c743d 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include +#include + #include #include @@ -245,6 +247,26 @@ TEST("require that you distribute well") { EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize()); } +TEST("Test creation of different types") { + auto iseq = SequencedTaskExecutor::create(1); + + EXPECT_EQUAL(1u, iseq->getNumExecutors()); + auto * seq = dynamic_cast(iseq.get()); + ASSERT_TRUE(seq != nullptr); + + iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::LATENCY); + seq = dynamic_cast(iseq.get()); + ASSERT_TRUE(seq != nullptr); + + iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::THROUGHPUT); + seq = dynamic_cast(iseq.get()); + ASSERT_TRUE(seq != nullptr); + + iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::ADAPTIVE, 17); + auto aseq = dynamic_cast(iseq.get()); + ASSERT_TRUE(aseq != nullptr); +} + } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 6d42895bffc..a0c2f0ac237 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -15,16 +15,17 @@ constexpr uint32_t stackSize = 128 * 1024; std::unique_ptr -SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) +SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) { if (optimize == OptimizeFor::ADAPTIVE) { - return std::make_unique(threads, threads, 0, taskLimit); + return std::make_unique(threads, threads, kindOfWatermark, taskLimit); } else { auto executors = std::make_unique>>(); executors->reserve(threads); for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { - executors->push_back(std::make_unique(taskLimit)); + uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 10 : kindOfWatermark; + executors->push_back(std::make_unique(taskLimit, watermark, reactionTime)); } else { executors->push_back(std::make_unique(1, stackSize, taskLimit)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 14127970403..b3dd400478a 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -2,6 +2,7 @@ #pragma once #include "isequencedtaskexecutor.h" +#include namespace vespalib { @@ -30,9 +31,10 @@ public: /* * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside. + * */ static std::unique_ptr - create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY); + create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms); }; } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 90eb18c23ef..a17037799a3 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -6,6 +6,10 @@ namespace vespalib { SingleExecutor::SingleExecutor(uint32_t taskLimit) + : SingleExecutor(taskLimit, taskLimit/10, 5ms) +{ } + +SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime) : _taskLimit(vespalib::roundUp2inN(taskLimit)), _wantedTaskLimit(_taskLimit.load()), _rp(0), @@ -19,10 +23,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wakeupConsumerAt(0), _producerNeedWakeupAt(0), _wp(0), + _watermark(std::min(_taskLimit.load(), watermark)), + _reactionTime(reactionTime), _closed(false) { + assert(taskLimit >= watermark); _thread.start(); } + SingleExecutor::~SingleExecutor() { shutdown(); sync(); @@ -62,7 +70,7 @@ SingleExecutor::execute(Task::UP task) { void SingleExecutor::setTaskLimit(uint32_t taskLimit) { - _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); + _wantedTaskLimit = std::max(vespalib::roundUp2inN(taskLimit), size_t(_watermark)); } void @@ -102,10 +110,10 @@ SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); _producerCondition.notify_all(); - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed); Lock lock(_mutex); if (numTasks() <= 0) { - _consumerCondition.wait_for(lock, 10ms); + _consumerCondition.wait_for(lock, _reactionTime); } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } @@ -144,7 +152,7 @@ SingleExecutor::wait_for_room(Lock & lock) { } _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, 10ms, wp - taskLimit/4); + sleepProducer(lock, _reactionTime, wp - _watermark); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 3d759769ea3..a58128c15aa 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -19,6 +19,7 @@ namespace vespalib { class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: explicit SingleExecutor(uint32_t taskLimit); + SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; @@ -56,6 +57,8 @@ private: std::atomic _wakeupConsumerAt; std::atomic _producerNeedWakeupAt; std::atomic _wp; + const uint32_t _watermark; + const duration _reactionTime; bool _closed; }; -- cgit v1.2.3