diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-15 08:08:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-15 08:08:06 +0100 |
commit | 6eaa34d5ec1e51805ab0819650621bc23c89ddc4 (patch) | |
tree | fc9dd279bb9d49b6da6285d30a6fc000c1bdb947 | |
parent | b019cb4efa7dbd5837f0da054feee4de7746bff8 (diff) | |
parent | 280745a1bd295898024de84e3eaf4eebf2bdc6b5 (diff) |
Merge pull request #20797 from vespa-engine/balder/single-executor-with-unbound-q
- Add support for using an unbound Q -> nonblocking.
6 files changed, 120 insertions, 30 deletions
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index dd71380f64a..56352ff3c0d 100644 --- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -30,6 +30,28 @@ TEST("test that all tasks are executed") { EXPECT_EQUAL(10000u, counter); } +TEST("test that executor can overflow") { + constexpr size_t NUM_TASKS = 1000; + std::atomic<uint64_t> counter(0); + vespalib::Gate gate; + SingleExecutor executor(sequenced_executor, 10, false, 1, 1ms); + executor.execute(makeLambdaTask([&gate] { gate.await();})); + + for(size_t i(0); i < NUM_TASKS; i++) { + executor.execute(makeLambdaTask([&counter, i] { + EXPECT_EQUAL(i, counter); + counter++; + })); + } + EXPECT_EQUAL(0u, counter); + ExecutorStats stats = executor.getStats(); + EXPECT_EQUAL(NUM_TASKS + 1, stats.acceptedTasks); + EXPECT_EQUAL(NUM_TASKS, stats.queueSize.max()); + gate.countDown(); + executor.sync(); + EXPECT_EQUAL(NUM_TASKS, counter); +} + void verifyResizeTaskLimit(bool up) { std::mutex lock; std::condition_variable cond; @@ -38,7 +60,7 @@ void verifyResizeTaskLimit(bool up) { constexpr uint32_t INITIAL = 20; const uint32_t INITIAL_2inN = roundUp2inN(INITIAL); double waterMarkRatio = 0.5; - SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms); + SingleExecutor executor(sequenced_executor, INITIAL, true, INITIAL*waterMarkRatio, 10ms); EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit()); EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark()); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 76b0235301b..58ae862f7c6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -67,7 +67,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3 for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark; - executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, 100ms)); + executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms)); } else { executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index a99bce0a705..21ed90c3d22 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -7,12 +7,12 @@ namespace vespalib { SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit) - : SingleExecutor(func, taskLimit, taskLimit/10, 100ms) + : SingleExecutor(func, taskLimit, true, taskLimit/10, 100ms) { } -SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime) - : _watermarkRatio(watermark < taskLimit ? double(watermark) / taskLimit : 1.0), - _taskLimit(vespalib::roundUp2inN(taskLimit)), +SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime) + : _watermarkRatio(watermark < reservedQueueSize ? double(watermark) / reservedQueueSize : 1.0), + _taskLimit(vespalib::roundUp2inN(reservedQueueSize)), _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique<Task::UP[]>(_taskLimit)), @@ -30,9 +30,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _wp(0), _watermark(_taskLimit.load()*_watermarkRatio), _reactionTime(reactionTime), - _closed(false) + _closed(false), + _overflow() { - assert(taskLimit >= watermark); + assert(reservedQueueSize >= watermark); + if ( ! isQueueSizeHard) { + _overflow = std::make_unique<ArrayQueue<Task::UP>>(); + } _thread.start(); } @@ -62,10 +66,12 @@ SingleExecutor::execute(Task::UP task) { if (_closed) { return task; } - wait_for_room(guard); - wp = _wp.load(std::memory_order_relaxed); - _tasks[index(wp)] = std::move(task); - _wp.store(wp + 1, std::memory_order_release); + task = wait_for_room_or_put_in_overflow_Q(guard, std::move(task)); + if (task) { + wp = move_to_main_q(guard, std::move(task)); + } else { + wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(guard); + } } if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { _consumerCondition.notify_one(); @@ -73,6 +79,24 @@ SingleExecutor::execute(Task::UP task) { return task; } +uint64_t +SingleExecutor::numTasks() { + if (_overflow) { + Lock guard(_mutex); + return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); + } else { + return num_tasks_in_main_q(); + } +} + +uint64_t +SingleExecutor::move_to_main_q(Lock &, Task::UP task) { + uint64_t wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_release); + return wp; +} + void SingleExecutor::setTaskLimit(uint32_t taskLimit) { _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); @@ -81,7 +105,7 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) { void SingleExecutor::drain(Lock & lock) { uint64_t wp = _wp.load(std::memory_order_relaxed); - while (numTasks() > 0) { + while (numTasks(lock) > 0) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); } @@ -97,7 +121,7 @@ SingleExecutor::wakeup() { SingleExecutor & SingleExecutor::sync() { Lock lock(_mutex); - uint64_t wp = _wp.load(std::memory_order_relaxed); + uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock); while (wp > _rp.load(std::memory_order_acquire)) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); @@ -119,7 +143,7 @@ SingleExecutor::run() { _producerCondition.notify_all(); _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed); Lock lock(_mutex); - if (numTasks() <= 0) { + if (numTasks(lock) <= 0) { steady_time now = steady_clock::now(); _threadIdleTracker.set_idle(now); _consumerCondition.wait_until(lock, now + _reactionTime); @@ -134,6 +158,22 @@ void SingleExecutor::drain_tasks() { while (numTasks() > 0) { run_tasks_till(_wp.load(std::memory_order_acquire)); + move_overflow_to_main_q(); + } +} + +void +SingleExecutor::move_overflow_to_main_q() +{ + if ( ! _overflow) return; + Lock guard(_mutex); + move_overflow_to_main_q(guard); +} +void +SingleExecutor::move_overflow_to_main_q(Lock & guard) { + while ( !_overflow->empty() && num_tasks_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) { + move_to_main_q(guard, std::move(_overflow->front())); + _overflow->pop(); } } @@ -151,26 +191,42 @@ SingleExecutor::run_tasks_till(uint64_t available) { } } -void -SingleExecutor::wait_for_room(Lock & lock) { +Executor::Task::UP +SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) { uint64_t wp = _wp.load(std::memory_order_relaxed); uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed); if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) { - drain(lock); + drain(guard); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); _watermark = _taskLimit * _watermarkRatio; } - _queueSize.add(numTasks()); - while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, _reactionTime, wp - get_watermark()); + uint64_t numTaskInQ = numTasks(guard); + _queueSize.add(numTaskInQ); + if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) { + if (_overflow) { + _overflow->push(std::move(task)); + } else { + while (numTasks(guard) >= _taskLimit.load(std::memory_order_relaxed)) { + sleepProducer(guard, _reactionTime, wp - get_watermark()); + } + } + } else { + if (_overflow && !_overflow->empty()) { + _overflow->push(std::move(task)); + } + } + if (_overflow && !_overflow->empty()) { + assert(!task); + move_overflow_to_main_q(guard); } + return task; } ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); - uint64_t accepted = _wp.load(std::memory_order_relaxed); + uint64_t accepted = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock); steady_time now = steady_clock::now(); _idleTracker.was_idle(_threadIdleTracker.reset(now)); ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount); diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index e76e3f17a41..4fdc217e701 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/executor_idle_tracking.h> #include <thread> #include <atomic> @@ -19,8 +20,8 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - SingleExecutor(init_fun_t func, uint32_t taskLimit); - SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); + SingleExecutor(init_fun_t func, uint32_t reservedQueueSize); + SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; @@ -39,12 +40,22 @@ private: void drain_tasks(); void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt); void run_tasks_till(uint64_t available); - void wait_for_room(Lock & guard); + Task::UP wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task); + uint64_t move_to_main_q(Lock & guard, Task::UP task); + void move_overflow_to_main_q(); + void move_overflow_to_main_q(Lock & guard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } - uint64_t numTasks() const { + uint64_t numTasks(); + uint64_t numTasks(Lock & guard) const { + return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); + } + uint64_t num_tasks_in_overflow_q(Lock &) const { + return _overflow ? _overflow->size() : 0; + } + uint64_t num_tasks_in_main_q() const { return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire); } const double _watermarkRatio; @@ -67,6 +78,7 @@ private: std::atomic<uint32_t> _watermark; const duration _reactionTime; bool _closed; + std::unique_ptr<ArrayQueue<Task::UP>> _overflow; }; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 3d24ee87879..5dafd9c5eda 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -113,7 +113,7 @@ class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor public: using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>; UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove) - : _to_remove(to_remove) + : _to_remove(to_remove) {} void process(spi::DocEntry& entry) override { diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp index 73e70e7fd89..8f3dd8ab006 100644 --- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp +++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp @@ -2,11 +2,11 @@ #pragma once -#include <stdint.h> -#include <stdlib.h> +#include "traits.h" +#include <cstdint> +#include <cstdlib> #include <cassert> #include <algorithm> -#include "traits.h" namespace vespalib { |