summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-14 11:54:53 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-01-18 14:38:23 +0000
commitaeb3ab8725ee058c65036d582155edc20eff654b (patch)
tree216f666d0a69da61a0c9053b806e00762319671c /staging_vespalib
parent68da7d45fc5b6e6686963aed4432107a20c74b1f (diff)
Wire in control of whether taskLimit is hard.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp2
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp36
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp17
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h1
5 files changed, 46 insertions, 14 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
index 3528cf74040..c609c538977 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -60,7 +60,7 @@ int main(int argc, char **argv) {
auto optimize = optimize_for_throughput
? vespalib::Executor::OptimizeFor::THROUGHPUT
: vespalib::Executor::OptimizeFor::LATENCY;
- executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, optimize);
+ executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, true, optimize);
}
vespalib::Timer timer;
for (size_t task_id = 0; task_id < num_tasks; ++task_id) {
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 243935d4013..705d6346e8c 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -2,7 +2,8 @@
#include <vespa/vespalib/util/sequencedtaskexecutor.h>
#include <vespa/vespalib/util/adaptive_sequenced_executor.h>
-
+#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/test/insertion_operators.h>
@@ -22,7 +23,10 @@ class Fixture
public:
std::unique_ptr<ISequencedTaskExecutor> _threads;
- Fixture() : _threads(SequencedTaskExecutor::create(sequenced_executor, 2)) { }
+ Fixture(bool is_task_limit_hard = true) :
+ _threads(SequencedTaskExecutor::create(sequenced_executor, 2, 1000, is_task_limit_hard,
+ Executor::OptimizeFor::LATENCY))
+ { }
};
@@ -258,6 +262,28 @@ TEST("require that you get correct number of executors") {
EXPECT_EQUAL(7u, seven->getNumExecutors());
}
+void verifyHardLimitForLatency(bool expect_hard) {
+ auto sequenced = SequencedTaskExecutor::create(sequenced_executor, 1, 100, expect_hard, Executor::OptimizeFor::LATENCY);
+ const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*sequenced);
+ EXPECT_EQUAL(expect_hard,nullptr != dynamic_cast<const BlockingThreadStackExecutor *>(seq.first_executor()));
+}
+
+void verifyHardLimitForThroughput(bool expect_hard) {
+ auto sequenced = SequencedTaskExecutor::create(sequenced_executor, 1, 100, expect_hard, Executor::OptimizeFor::THROUGHPUT);
+ const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*sequenced);
+ const SingleExecutor * first = dynamic_cast<const SingleExecutor *>(seq.first_executor());
+ EXPECT_TRUE(first != nullptr);
+ EXPECT_EQUAL(expect_hard, first->isBlocking());
+}
+
+TEST("require that you can get executor with both hard and soft limit") {
+ verifyHardLimitForLatency(true);
+ verifyHardLimitForLatency(false);
+ verifyHardLimitForThroughput(true);
+ verifyHardLimitForThroughput(false);
+}
+
+
TEST("require that you distribute well") {
auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven);
@@ -307,15 +333,15 @@ TEST("Test creation of different types") {
auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::LATENCY);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::LATENCY);
seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::THROUGHPUT);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::THROUGHPUT);
seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::ADAPTIVE, 17);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, true, Executor::OptimizeFor::ADAPTIVE, 17);
auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get());
ASSERT_TRUE(aseq != nullptr);
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 58ae862f7c6..88a679b4cdb 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -3,6 +3,7 @@
#include "sequencedtaskexecutor.h"
#include "adaptive_sequenced_executor.h"
#include "singleexecutor.h"
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/stllike/hashtable.h>
@@ -46,17 +47,17 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads) {
std::unique_ptr<ISequencedTaskExecutor>
SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit) {
- return create(func, threads, taskLimit, OptimizeFor::LATENCY);
+ return create(func, threads, taskLimit, true, OptimizeFor::LATENCY);
}
std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) {
- return create(func, threads, taskLimit, optimize, 0);
+SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, bool is_task_limit_hard, OptimizeFor optimize) {
+ return create(func, threads, taskLimit, is_task_limit_hard, optimize, 0);
}
std::unique_ptr<ISequencedTaskExecutor>
SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
- OptimizeFor optimize, uint32_t kindOfWatermark)
+ bool is_task_limit_hard, OptimizeFor optimize, uint32_t kindOfWatermark)
{
if (optimize == OptimizeFor::ADAPTIVE) {
size_t num_strands = std::min(taskLimit, threads*32);
@@ -67,9 +68,13 @@ SequencedTaskExecutor::create(Runnable::init_fun_t func, uint32_t threads, uint3
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = (kindOfWatermark == 0) ? taskLimit / 10 : kindOfWatermark;
- executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, true, watermark, 100ms));
+ executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, is_task_limit_hard, watermark, 100ms));
} else {
- executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
+ if (is_task_limit_hard) {
+ executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
+ } else {
+ executors.push_back(std::make_unique<ThreadStackExecutor>(1, stackSize, func));
+ }
}
}
return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 91304a6a2e3..a4b1b82aacf 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -35,10 +35,10 @@ public:
static std::unique_ptr<ISequencedTaskExecutor>
create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit);
static std::unique_ptr<ISequencedTaskExecutor>
- create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, OptimizeFor optimize);
+ create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, bool is_task_limit_hard, OptimizeFor optimize);
static std::unique_ptr<ISequencedTaskExecutor>
create(Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
- OptimizeFor optimize, uint32_t kindOfWatermark);
+ bool is_task_limit_hard, OptimizeFor optimize, uint32_t kindOfWatermark);
/**
* For testing only
*/
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 4fdc217e701..dd755a76302 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -33,6 +33,7 @@ public:
duration get_reaction_time() const { return _reactionTime; }
ExecutorStats getStats() override;
SingleExecutor & shutdown() override;
+ bool isBlocking() const { return !_overflow; }
private:
using Lock = std::unique_lock<std::mutex>;
void drain(Lock & lock);