summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-15 08:08:06 +0100
committerGitHub <noreply@github.com>2022-01-15 08:08:06 +0100
commit6eaa34d5ec1e51805ab0819650621bc23c89ddc4 (patch)
treefc9dd279bb9d49b6da6285d30a6fc000c1bdb947
parentb019cb4efa7dbd5837f0da054feee4de7746bff8 (diff)
parent280745a1bd295898024de84e3eaf4eebf2bdc6b5 (diff)
Merge pull request #20797 from vespa-engine/balder/single-executor-with-unbound-q
- Add support for using an unbound Q -> nonblocking.
-rw-r--r--staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp24
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp96
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h20
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/util/arrayqueue.hpp6
6 files changed, 120 insertions, 30 deletions
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
index dd71380f64a..56352ff3c0d 100644
--- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
+++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -30,6 +30,28 @@ TEST("test that all tasks are executed") {
EXPECT_EQUAL(10000u, counter);
}
+TEST("test that executor can overflow") {
+ constexpr size_t NUM_TASKS = 1000;
+ std::atomic<uint64_t> counter(0);
+ vespalib::Gate gate;
+ SingleExecutor executor(sequenced_executor, 10, false, 1, 1ms);
+ executor.execute(makeLambdaTask([&gate] { gate.await();}));
+
+ for(size_t i(0); i < NUM_TASKS; i++) {
+ executor.execute(makeLambdaTask([&counter, i] {
+ EXPECT_EQUAL(i, counter);
+ counter++;
+ }));
+ }
+ EXPECT_EQUAL(0u, counter);
+ ExecutorStats stats = executor.getStats();
+ EXPECT_EQUAL(NUM_TASKS + 1, stats.acceptedTasks);
+ EXPECT_EQUAL(NUM_TASKS, stats.queueSize.max());
+ gate.countDown();
+ executor.sync();
+ EXPECT_EQUAL(NUM_TASKS, counter);
+}
+
void verifyResizeTaskLimit(bool up) {
std::mutex lock;
std::condition_variable cond;
@@ -38,7 +60,7 @@ void verifyResizeTaskLimit(bool up) {
constexpr uint32_t INITIAL = 20;
const uint32_t INITIAL_2inN = roundUp2inN(INITIAL);
double waterMarkRatio = 0.5;
- SingleExecutor executor(sequenced_executor, INITIAL, INITIAL*waterMarkRatio, 10ms);
+ SingleExecutor executor(sequenced_executor, INITIAL, true, INITIAL*waterMarkRatio, 10ms);
EXPECT_EQUAL(INITIAL_2inN, executor.getTaskLimit());
EXPECT_EQUAL(uint32_t(INITIAL_2inN*waterMarkRatio), executor.get_watermark());
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 76b0235301b..58ae862f7c6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -67,7 +67,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark;
- executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, 100ms));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms));
} else {
executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index a99bce0a705..21ed90c3d22 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -7,12 +7,12 @@
namespace vespalib {
SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit)
- : SingleExecutor(func, taskLimit, taskLimit/10, 100ms)
+ : SingleExecutor(func, taskLimit, true, taskLimit/10, 100ms)
{ }
-SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime)
- : _watermarkRatio(watermark < taskLimit ? double(watermark) / taskLimit : 1.0),
- _taskLimit(vespalib::roundUp2inN(taskLimit)),
+SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime)
+ : _watermarkRatio(watermark < reservedQueueSize ? double(watermark) / reservedQueueSize : 1.0),
+ _taskLimit(vespalib::roundUp2inN(reservedQueueSize)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
_tasks(std::make_unique<Task::UP[]>(_taskLimit)),
@@ -30,9 +30,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_wp(0),
_watermark(_taskLimit.load()*_watermarkRatio),
_reactionTime(reactionTime),
- _closed(false)
+ _closed(false),
+ _overflow()
{
- assert(taskLimit >= watermark);
+ assert(reservedQueueSize >= watermark);
+ if ( ! isQueueSizeHard) {
+ _overflow = std::make_unique<ArrayQueue<Task::UP>>();
+ }
_thread.start();
}
@@ -62,10 +66,12 @@ SingleExecutor::execute(Task::UP task) {
if (_closed) {
return task;
}
- wait_for_room(guard);
- wp = _wp.load(std::memory_order_relaxed);
- _tasks[index(wp)] = std::move(task);
- _wp.store(wp + 1, std::memory_order_release);
+ task = wait_for_room_or_put_in_overflow_Q(guard, std::move(task));
+ if (task) {
+ wp = move_to_main_q(guard, std::move(task));
+ } else {
+ wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(guard);
+ }
}
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
_consumerCondition.notify_one();
@@ -73,6 +79,24 @@ SingleExecutor::execute(Task::UP task) {
return task;
}
+uint64_t
+SingleExecutor::numTasks() {
+ if (_overflow) {
+ Lock guard(_mutex);
+ return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
+ } else {
+ return num_tasks_in_main_q();
+ }
+}
+
+uint64_t
+SingleExecutor::move_to_main_q(Lock &, Task::UP task) {
+ uint64_t wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_release);
+ return wp;
+}
+
void
SingleExecutor::setTaskLimit(uint32_t taskLimit) {
_wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
@@ -81,7 +105,7 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) {
void
SingleExecutor::drain(Lock & lock) {
uint64_t wp = _wp.load(std::memory_order_relaxed);
- while (numTasks() > 0) {
+ while (numTasks(lock) > 0) {
_consumerCondition.notify_one();
sleepProducer(lock, 100us, wp);
}
@@ -97,7 +121,7 @@ SingleExecutor::wakeup() {
SingleExecutor &
SingleExecutor::sync() {
Lock lock(_mutex);
- uint64_t wp = _wp.load(std::memory_order_relaxed);
+ uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock);
while (wp > _rp.load(std::memory_order_acquire)) {
_consumerCondition.notify_one();
sleepProducer(lock, 100us, wp);
@@ -119,7 +143,7 @@ SingleExecutor::run() {
_producerCondition.notify_all();
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed);
Lock lock(_mutex);
- if (numTasks() <= 0) {
+ if (numTasks(lock) <= 0) {
steady_time now = steady_clock::now();
_threadIdleTracker.set_idle(now);
_consumerCondition.wait_until(lock, now + _reactionTime);
@@ -134,6 +158,22 @@ void
SingleExecutor::drain_tasks() {
while (numTasks() > 0) {
run_tasks_till(_wp.load(std::memory_order_acquire));
+ move_overflow_to_main_q();
+ }
+}
+
+void
+SingleExecutor::move_overflow_to_main_q()
+{
+ if ( ! _overflow) return;
+ Lock guard(_mutex);
+ move_overflow_to_main_q(guard);
+}
+void
+SingleExecutor::move_overflow_to_main_q(Lock & guard) {
+ while ( !_overflow->empty() && num_tasks_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) {
+ move_to_main_q(guard, std::move(_overflow->front()));
+ _overflow->pop();
}
}
@@ -151,26 +191,42 @@ SingleExecutor::run_tasks_till(uint64_t available) {
}
}
-void
-SingleExecutor::wait_for_room(Lock & lock) {
+Executor::Task::UP
+SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) {
uint64_t wp = _wp.load(std::memory_order_relaxed);
uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed);
if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) {
- drain(lock);
+ drain(guard);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
_watermark = _taskLimit * _watermarkRatio;
}
- _queueSize.add(numTasks());
- while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(lock, _reactionTime, wp - get_watermark());
+ uint64_t numTaskInQ = numTasks(guard);
+ _queueSize.add(numTaskInQ);
+ if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) {
+ if (_overflow) {
+ _overflow->push(std::move(task));
+ } else {
+ while (numTasks(guard) >= _taskLimit.load(std::memory_order_relaxed)) {
+ sleepProducer(guard, _reactionTime, wp - get_watermark());
+ }
+ }
+ } else {
+ if (_overflow && !_overflow->empty()) {
+ _overflow->push(std::move(task));
+ }
+ }
+ if (_overflow && !_overflow->empty()) {
+ assert(!task);
+ move_overflow_to_main_q(guard);
}
+ return task;
}
ExecutorStats
SingleExecutor::getStats() {
Lock lock(_mutex);
- uint64_t accepted = _wp.load(std::memory_order_relaxed);
+ uint64_t accepted = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock);
steady_time now = steady_clock::now();
_idleTracker.was_idle(_threadIdleTracker.reset(now));
ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount);
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index e76e3f17a41..4fdc217e701 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
@@ -19,8 +20,8 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- SingleExecutor(init_fun_t func, uint32_t taskLimit);
- SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
+ SingleExecutor(init_fun_t func, uint32_t reservedQueueSize);
+ SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool isQueueSizeHard, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
@@ -39,12 +40,22 @@ private:
void drain_tasks();
void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt);
void run_tasks_till(uint64_t available);
- void wait_for_room(Lock & guard);
+ Task::UP wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task);
+ uint64_t move_to_main_q(Lock & guard, Task::UP task);
+ void move_overflow_to_main_q();
+ void move_overflow_to_main_q(Lock & guard);
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
- uint64_t numTasks() const {
+ uint64_t numTasks();
+ uint64_t numTasks(Lock & guard) const {
+ return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
+ }
+ uint64_t num_tasks_in_overflow_q(Lock &) const {
+ return _overflow ? _overflow->size() : 0;
+ }
+ uint64_t num_tasks_in_main_q() const {
return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire);
}
const double _watermarkRatio;
@@ -67,6 +78,7 @@ private:
std::atomic<uint32_t> _watermark;
const duration _reactionTime;
bool _closed;
+ std::unique_ptr<ArrayQueue<Task::UP>> _overflow;
};
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 3d24ee87879..5dafd9c5eda 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -113,7 +113,7 @@ class UnrevertableRemoveEntryProcessor : public BucketProcessor::EntryProcessor
public:
using DocumentIdsAndTimeStamps = std::vector<std::pair<spi::Timestamp, spi::DocumentId>>;
UnrevertableRemoveEntryProcessor(DocumentIdsAndTimeStamps & to_remove)
- : _to_remove(to_remove)
+ : _to_remove(to_remove)
{}
void process(spi::DocEntry& entry) override {
diff --git a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
index 73e70e7fd89..8f3dd8ab006 100644
--- a/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
+++ b/vespalib/src/vespa/vespalib/util/arrayqueue.hpp
@@ -2,11 +2,11 @@
#pragma once
-#include <stdint.h>
-#include <stdlib.h>
+#include "traits.h"
+#include <cstdint>
+#include <cstdlib>
#include <cassert>
#include <algorithm>
-#include "traits.h"
namespace vespalib {