diff options
6 files changed, 105 insertions, 28 deletions
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index dd71380f64a..56352ff3c0d 100644 --- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -30,6 +30,28 @@ TEST("test that all tasks are executed") { EXPECT_EQUAL(10000u, counter); } +TEST("test that executor can overflow") { + constexpr size_t NUM_TASKS = 1000; + std::atomic<uint64_t> counter(0); + vespalib::Gate gate; + SingleExecutor executor(sequenced_executor, 10, false, 1, 1ms); + executor.execute(makeLambdaTask([&gate] { gate.await();})); + + for(size_t i(0); i < NUM_TASKS; i++) { + executor.execute(makeLambdaTask([&counter, i] { + EXPECT_EQUAL(i, counter); + counter++; + })); + } + EXPECT_EQUAL(0u, counter); + ExecutorStats stats = executor.getStats(); + EXPECT_EQUAL(NUM_TASKS + 1, stats.acceptedTasks); + EXPECT_EQUAL(NUM_TASKS, stats.queueSize.max()); + gate.countDown(); + executor.sync(); + EXPECT_EQUAL(NUM_TASKS, counter); +} + void verifyResizeTaskLimit(bool up) { std::mutex lock; std::condition_variable cond; @@ -38,7 +60,7 @@ void verifyResizeTaskLimit(bool up) { constexpr uint32_t INITIAL = 20; const uint32_t INITIAL_2inN = roundUp2inN(INITIAL); double waterMarkRatio = 0.5; - SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms); + SingleExecutor executor(sequenced_executor, INITIAL, true, INITIAL*waterMarkRatio, 10ms); EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit()); EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark()); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 76b0235301b..58ae862f7c6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -67,7 +67,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3 for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark; - executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, 100ms)); + executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms)); } else { executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index a99bce0a705..4acfdb376c6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -7,12 +7,12 @@ namespace vespalib { SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit) - : SingleExecutor(func, taskLimit, taskLimit/10, 100ms) + : SingleExecutor(func, taskLimit, true, taskLimit/10, 100ms) { } -SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime) - : _watermarkRatio(watermark < taskLimit ? double(watermark) / taskLimit : 1.0), - _taskLimit(vespalib::roundUp2inN(taskLimit)), +SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime) + : _watermarkRatio(watermark < reservedQueueSize ? double(watermark) / reservedQueueSize : 1.0), + _taskLimit(vespalib::roundUp2inN(reservedQueueSize)), _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique<Task::UP[]>(_taskLimit)), @@ -30,9 +30,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _wp(0), _watermark(_taskLimit.load()*_watermarkRatio), _reactionTime(reactionTime), - _closed(false) + _closed(false), + _overflow() { - assert(taskLimit >= watermark); + assert(reservedQueueSize >= watermark); + if ( ! isQueueSizeHard) { + _overflow = std::make_unique<ArrayQueue<Task::UP>>(); + } _thread.start(); } @@ -56,16 +60,16 @@ SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeup Executor::Task::UP SingleExecutor::execute(Task::UP task) { - uint64_t wp; + uint64_t wp(0); { Lock guard(_mutex); if (_closed) { return task; } - wait_for_room(guard); - wp = _wp.load(std::memory_order_relaxed); - _tasks[index(wp)] = std::move(task); - _wp.store(wp + 1, std::memory_order_release); + task = wait_for_room_or_put_in_overflow_Q(guard, std::move(task)); + if (task) { + wp = move_to_main_q(guard, std::move(task)); + } } if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { _consumerCondition.notify_one(); @@ -73,6 +77,14 @@ SingleExecutor::execute(Task::UP task) { return task; } +uint64_t +SingleExecutor::move_to_main_q(Lock &, Task::UP task) { + uint64_t wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_release); + return wp; +} + void SingleExecutor::setTaskLimit(uint32_t taskLimit) { _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); @@ -97,7 +109,7 @@ SingleExecutor::wakeup() { SingleExecutor & SingleExecutor::sync() { Lock lock(_mutex); - uint64_t wp = _wp.load(std::memory_order_relaxed); + uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(); while (wp > _rp.load(std::memory_order_acquire)) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); @@ -134,6 +146,22 @@ void SingleExecutor::drain_tasks() { while (numTasks() > 0) { run_tasks_till(_wp.load(std::memory_order_acquire)); + move_overflow_to_main_q(); + } +} + +void +SingleExecutor::move_overflow_to_main_q() +{ + if ( ! _overflow) return; + Lock guard(_mutex); + move_overflow_to_main_q(guard); +} +void +SingleExecutor::move_overflow_to_main_q(Lock & guard) { + while ( !_overflow->empty() && num_task_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) { + move_to_main_q(guard, std::move(_overflow->front())); + _overflow->pop(); } } @@ -151,26 +179,42 @@ SingleExecutor::run_tasks_till(uint64_t available) { } } -void -SingleExecutor::wait_for_room(Lock & lock) { +Executor::Task::UP +SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) { uint64_t wp = _wp.load(std::memory_order_relaxed); uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed); if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) { - drain(lock); + drain(guard); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); _watermark = _taskLimit * _watermarkRatio; } - _queueSize.add(numTasks()); - while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, _reactionTime, wp - get_watermark()); + uint64_t numTaskInQ = numTasks(); + _queueSize.add(numTaskInQ); + if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) { + if (_overflow) { + _overflow->push(std::move(task)); + } else { + while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { + sleepProducer(guard, _reactionTime, wp - get_watermark()); + } + } + } else { + if (_overflow && !_overflow->empty()) { + _overflow->push(std::move(task)); + } } + if (_overflow && !_overflow->empty()) { + assert(!task); + move_overflow_to_main_q(guard); + } + return task; } ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); - uint64_t accepted = _wp.load(std::memory_order_relaxed); + uint64_t accepted = _wp.load(std::memory_order_relaxed) + (_overflow ? _overflow->size() : 0); steady_time now = steady_clock::now(); _idleTracker.was_idle(_threadIdleTracker.reset(now)); ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount); diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index e76e3f17a41..9ec107639bc 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/executor_idle_tracking.h> #include <thread> #include <atomic> @@ -19,8 +20,8 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - SingleExecutor(init_fun_t func, uint32_t taskLimit); - SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); + SingleExecutor(init_fun_t func, uint32_t reservedQueueSize); + SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; @@ -39,12 +40,21 @@ private: void drain_tasks(); void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt); void run_tasks_till(uint64_t available); - void wait_for_room(Lock & guard); + Task::UP wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task); + uint64_t move_to_main_q(Lock & guard, Task::UP task); + void move_overflow_to_main_q(); + void move_overflow_to_main_q(Lock & guard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } uint64_t numTasks() const { + return num_task_in_main_q() + num_tasks_in_overflow_q(); + } + uint64_t num_tasks_in_overflow_q() const { + return _overflow ? _overflow->size() : 0; + } + uint64_t num_task_in_main_q() const { return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire); } const double _watermarkRatio; @@ -67,6 +77,7 @@ private: std::atomic<uint32_t> _watermark; const duration _reactionTime; bool _closed; + std::unique_ptr<ArrayQueue<Task::UP>> _overflow; }; } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 3d24ee87879..5dafd9c5eda 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -113,7 +113,7 @@ class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor public: using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>; UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove) - : _to_remove(to_remove) + : _to_remove(to_remove) {} void process(spi::DocEntry& entry) override { diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp index 73e70e7fd89..8f3dd8ab006 100644 --- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp +++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp @@ -2,11 +2,11 @@ #pragma once -#include <stdint.h> -#include <stdlib.h> +#include "traits.h" +#include <cstdint> +#include <cstdlib> #include <cassert> #include <algorithm> -#include "traits.h" namespace vespalib { |