diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-26 05:52:53 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-27 21:48:14 +0000 |
commit | e114a0390b0aeb493a922ed3f90d0b90c7ad3c7f (patch) | |
tree | 6b0c7dd132c347fe16cba0f5a7e299d8dcfba1ff /staging_vespalib/src | |
parent | d7fb5fac283eef2c09eeaabd94288fa123e9f94c (diff) |
Add single threaded thoughput optimized executor with high and low watermark at 25% / 75%.
Diffstat (limited to 'staging_vespalib/src')
5 files changed, 268 insertions, 0 deletions
diff --git a/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt b/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt new file mode 100644 index 00000000000..c5d42d2c8c5 --- /dev/null +++ b/staging_vespalib/src/tests/singleexecutor/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(staging_vespalib_singleexecutor_test_app TEST + SOURCES + singleexecutor_test.cpp + DEPENDS + staging_vespalib +) +vespa_add_test(NAME staging_vespalib_singleexecutor_test_app COMMAND staging_vespalib_singleexecutor_test_app) diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp new file mode 100644 index 00000000000..5dacaa5d204 --- /dev/null +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -0,0 +1,80 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/testapp.h> + +#include <vespa/vespalib/util/singleexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> +#include <atomic> + +using namespace vespalib; + +TEST("test that all tasks are executed") { + + std::atomic<uint64_t> counter(0); + SingleExecutor executor(10); + + for (uint64_t i(0); i < 10; i++) { + executor.execute(makeLambdaTask([&counter] {counter++;})); + } + executor.sync(); + EXPECT_EQUAL(10u, counter); + + counter = 0; + for (uint64_t i(0); i < 10000; i++) { + executor.execute(makeLambdaTask([&counter] {counter++;})); + } + executor.sync(); + EXPECT_EQUAL(10000u, counter); +} + +void verifyResizeTaskLimit(bool up) { + Monitor lock; + std::atomic<uint64_t> started(0); + std::atomic<uint64_t> allowed(0); + SingleExecutor executor(10); + + uint32_t targetTaskLimit = up ? 20 : 5; + uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit); + EXPECT_NOT_EQUAL(16u, roundedTaskLimit); + + for (uint64_t i(0); i < 10; i++) { + executor.execute(makeLambdaTask([&lock, &started, &allowed] { + started++; + MonitorGuard guard(lock); + while (allowed < started) { + guard.wait(1ms); + } + })); + } + while (started < 1); + EXPECT_EQUAL(1u, started); + executor.setTaskLimit(targetTaskLimit); + EXPECT_EQUAL(16u, executor.getTaskLimit()); + allowed = 5; + while (started < 6); + EXPECT_EQUAL(6u, started); + EXPECT_EQUAL(16u, executor.getTaskLimit()); + allowed = 10; + while (started < 10); + EXPECT_EQUAL(10u, started); + EXPECT_EQUAL(16u, executor.getTaskLimit()); + executor.execute(makeLambdaTask([&lock, &started, &allowed] { + started++; + MonitorGuard guard(lock); + while (allowed < started) { + guard.wait(1ms); + } + })); + while (started < 11); + EXPECT_EQUAL(11u, started); + EXPECT_EQUAL(roundedTaskLimit, executor.getTaskLimit()); + allowed = 11; +} +TEST("test that resizing up and down works") { + TEST_DO(verifyResizeTaskLimit(true)); + TEST_DO(verifyResizeTaskLimit(false)); + + +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt index 71364a813f6..ba03b77c941 100644 --- a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -17,6 +17,7 @@ vespa_add_library(staging_vespalib_vespalib_util OBJECT rusage.cpp shutdownguard.cpp scheduledexecutor.cpp + singleexecutor.cpp xmlserializable.cpp xmlstream.cpp DEPENDS diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp new file mode 100644 index 00000000000..59cc9a39957 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -0,0 +1,124 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "singleexecutor.h" +#include <vespa/vespalib/util/time.h> + +namespace vespalib { + +SingleExecutor::SingleExecutor(uint32_t taskLimit) + : _taskLimit(vespalib::roundUp2inN(taskLimit)), + _wantedTaskLimit(_taskLimit.load()), + _rp(0), + _tasks(std::make_unique<Task::UP[]>(_taskLimit)), + _monitor(), + _thread(*this), + _lastAccepted(0), + _maxPending(0), + _wakeupConsumerAt(0), + _producerNeedWakeup(false), + _wp(0) +{ + _thread.start(); +} +SingleExecutor::~SingleExecutor() { + sync(); + _thread.stop().join(); +} + +size_t +SingleExecutor::getNumThreads() const { + return 1; +} + +Executor::Task::UP +SingleExecutor::execute(Task::UP task) { + wait_for_room(); + uint64_t wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_relaxed); + if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { + MonitorGuard guard(_monitor); + guard.signal(); + } + return Task::UP(); +} + +void +SingleExecutor::setTaskLimit(uint32_t taskLimit) { + _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); +} + +SingleExecutor & +SingleExecutor::sync() { + uint64_t wp = _wp.load(std::memory_order_relaxed); + while (wp > _rp.load(std::memory_order_relaxed)) { + std::this_thread::sleep_for(1ms); + } + return *this; +} + +void +SingleExecutor::run() { + while (!_thread.stopped()) { + drain_tasks(); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); + MonitorGuard guard(_monitor); + guard.wait(1ms); + _wakeupConsumerAt.store(0, std::memory_order_relaxed); + } +} + +void +SingleExecutor::drain_tasks() { + while (numTasks() > 0) { + run_tasks_till(_wp.load(std::memory_order_relaxed)); + } +} + +void +SingleExecutor::run_tasks_till(uint64_t available) { + uint64_t consumed = _rp.load(std::memory_order_relaxed); + uint64_t left = available - consumed; + if (_maxPending.load(std::memory_order_relaxed) < left) { + _maxPending.store(left, std::memory_order_relaxed); + } + uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed) + ? (available - (left >> 2)) + : 0; + while (consumed < available) { + Task::UP task = std::move(_tasks[index(consumed)]); + task->run(); + _rp.store(++consumed, std::memory_order_relaxed); + if (wakeupLimit == consumed) { + MonitorGuard guard(_monitor); + guard.signal(); + } + } +} + +void +SingleExecutor::wait_for_room() { + if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) { + sync(); + _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); + _taskLimit = _wantedTaskLimit.load(); + } + while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { + _producerNeedWakeup.store(true, std::memory_order_relaxed); + MonitorGuard guard(_monitor); + guard.wait(1ms); + _producerNeedWakeup.store(false, std::memory_order_relaxed); + } +} + +ThreadExecutor::Stats +SingleExecutor::getStats() { + uint64_t accepted = _wp.load(std::memory_order_relaxed); + Stats stats(_maxPending, (accepted - _lastAccepted), 0); + _lastAccepted = accepted; + _maxPending = 0; + return stats; +} + + +} diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h new file mode 100644 index 00000000000..13520efa3d8 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -0,0 +1,55 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/thread.h> +#include <thread> +#include <atomic> + +namespace vespalib { + +/** + * Has a single thread consuming tasks from a fixed size ringbuffer. + * Made for throughput where the producer has no interaction with the consumer and + * it is hence very cheap to produce a task. The consumer wakes up once every ms to see if there are + * anything to consumer. It uses a lock free ringbuffer, but requires only a single producer. + * That must be enforced on the outside. + */ +class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { +public: + explicit SingleExecutor(uint32_t taskLimit); + ~SingleExecutor() override; + Task::UP execute(Task::UP task) override; + void setTaskLimit(uint32_t taskLimit) override; + SingleExecutor & sync() override; + size_t getNumThreads() const override; + uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } + Stats getStats() override; + +private: + void run() override; + void drain_tasks(); + void run_tasks_till(uint64_t available); + void wait_for_room(); + uint64_t index(uint64_t counter) const { + return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); + } + + uint64_t numTasks() const { + return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_relaxed); + } + std::atomic<uint32_t> _taskLimit; + std::atomic<uint32_t> _wantedTaskLimit; + std::atomic<uint64_t> _rp; + std::unique_ptr<Task::UP[]> _tasks; + vespalib::Monitor _monitor; + vespalib::Thread _thread; + uint64_t _lastAccepted; + std::atomic<uint64_t> _maxPending; + std::atomic<uint64_t> _wakeupConsumerAt; + std::atomic<bool> _producerNeedWakeup; + std::atomic<uint64_t> _wp; +}; + +} |