summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-19 08:28:59 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-01-19 08:28:59 +0000
commit07d895ff30db9a10989e2d4a076e3015acccfa2a (patch)
tree9cc7fc1fbe928516543d15505b3544df59d89037
parent240bd7f0ccf80ebc45fc953eded77404d11fb586 (diff)
Let the Adaptive Executor have both soft and hard limit.
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp6
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h15
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
5 files changed, 21 insertions, 11 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
index 5fc6d2a69ae..da31f1c1a79 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp
@@ -18,7 +18,7 @@ class Fixture
public:
AdaptiveSequencedExecutor _threads;
- Fixture() : _threads(2, 2, 0, 1000) { }
+ Fixture(bool is_max_pending_hard=true) : _threads(2, 2, 0, 1000, is_max_pending_hard) { }
};
@@ -231,12 +231,12 @@ TEST_F("require that executeLambda works", Fixture)
}
TEST("require that you get correct number of executors") {
- AdaptiveSequencedExecutor seven(7, 1, 0, 10);
+ AdaptiveSequencedExecutor seven(7, 1, 0, 10, true);
EXPECT_EQUAL(7u, seven.getNumExecutors());
}
TEST("require that you distribute well") {
- AdaptiveSequencedExecutor seven(7, 1, 0, 10);
+ AdaptiveSequencedExecutor seven(7, 1, 0, 10, true);
EXPECT_EQUAL(7u, seven.getNumExecutors());
for (uint32_t id=0; id < 1000; id++) {
EXPECT_EQUAL(id%7, seven.getExecutorId(id).getId());
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
index c609c538977..0f7c82ef988 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -55,7 +55,7 @@ int main(int argc, char **argv) {
std::atomic<long> counter(0);
std::unique_ptr<ISequencedTaskExecutor> executor;
if (use_adaptive_executor) {
- executor = std::make_unique<AdaptiveSequencedExecutor>(num_strands, num_threads, max_waiting, task_limit);
+ executor = std::make_unique<AdaptiveSequencedExecutor>(num_strands, num_threads, max_waiting, task_limit, true);
} else {
auto optimize = optimize_for_throughput
? vespalib::Executor::OptimizeFor::THROUGHPUT
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 4d08e14375c..1e23ba15785 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -95,7 +95,7 @@ AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock<std::mutex> &lock)
while (_self.state == Self::State::BLOCKED) {
_self.cond.wait(lock);
}
- while ((_self.state == Self::State::OPEN) && (_self.pending_tasks >= _cfg.max_pending)) {
+ while ((_self.state == Self::State::OPEN) && _cfg.is_above_max_pending(_self.pending_tasks)) {
_self.state = Self::State::BLOCKED;
while (_self.state == Self::State::BLOCKED) {
_self.cond.wait(lock);
@@ -228,7 +228,8 @@ AdaptiveSequencedExecutor::worker_main()
}
AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
- size_t max_waiting, size_t max_pending)
+ size_t max_waiting, size_t max_pending,
+ bool is_max_pending_hard)
: ISequencedTaskExecutor(num_strands),
_thread_tools(std::make_unique<ThreadTools>(*this)),
_mutex(),
@@ -238,7 +239,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t
_self(),
_stats(),
_idleTracker(steady_clock::now()),
- _cfg(num_threads, max_waiting, max_pending)
+ _cfg(num_threads, max_waiting, max_pending, is_max_pending_hard)
{
_stats.queueSize.add(_self.pending_tasks);
_thread_tools->start(num_threads);
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index ccf6ab977f3..d6244564fbd 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -51,14 +51,22 @@ private:
size_t max_waiting;
size_t max_pending;
size_t wakeup_limit;
+ bool is_max_pending_hard;
void set_max_pending(size_t max_pending_in) {
max_pending = std::max(1uL, max_pending_in);
wakeup_limit = std::max(1uL, size_t(max_pending * 0.9));
assert(wakeup_limit > 0);
assert(wakeup_limit <= max_pending);
}
- Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in)
- : num_threads(num_threads_in), max_waiting(max_waiting_in), max_pending(1000), wakeup_limit(900)
+ bool is_above_max_pending(size_t pending) {
+ return (pending >= max_pending) && is_max_pending_hard;
+ }
+ Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in, bool is_max_pending_hard_in)
+ : num_threads(num_threads_in),
+ max_waiting(max_waiting_in),
+ max_pending(1000),
+ wakeup_limit(900),
+ is_max_pending_hard(is_max_pending_hard_in)
{
assert(num_threads > 0);
set_max_pending(max_pending_in);
@@ -143,7 +151,8 @@ private:
void worker_main();
public:
AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads,
- size_t max_waiting, size_t max_pending);
+ size_t max_waiting, size_t max_pending,
+ bool is_max_pending_hard);
~AdaptiveSequencedExecutor() override;
ExecutorId getExecutorId(uint64_t component) const override;
void executeTask(ExecutorId id, Task::UP task) override;
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 88a679b4cdb..59ffad88d09 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -61,7 +61,7 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3
{
if (optimize == OptimizeFor::ADAPTIVE) {
size_t num_strands = std::min(taskLimit, threads*32);
- return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit);
+ return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit, is_task_limit_hard);
} else {
auto executors = std::vector<std::unique_ptr<SyncableThreadExecutor>>();
executors.reserve(threads);