diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-12 09:26:11 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-12 09:26:11 +0000 |
commit | 434fca0b458910329f63da49b9b1c84de232bf3f (patch) | |
tree | 6060535c3c2b13e40b37867b21230fdbdc7c80ec /staging_vespalib | |
parent | de23b574462e6931e6afd0906257f0bd7673f1f8 (diff) |
Name the threads so it is easier to see who is doing what.
Diffstat (limited to 'staging_vespalib')
7 files changed, 29 insertions, 19 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index ba82651f1fc..90067d86fc8 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -40,6 +40,8 @@ struct SimpleParams { } }; +VESPA_THREAD_STACK_TAG(sequenced_executor) + int main(int argc, char **argv) { SimpleParams params(argc, argv); bool use_adaptive_executor = params.next("use_adaptive_executor", 0); @@ -58,7 +60,7 @@ int main(int argc, char **argv) { auto optimize = optimize_for_throughput ? vespalib::Executor::OptimizeFor::THROUGHPUT : vespalib::Executor::OptimizeFor::LATENCY; - executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize); + executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, optimize); } vespalib::Timer timer; for (size_t task_id = 0; task_id < num_tasks; ++task_id) { diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 29b25cd0471..21674b4e2d0 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -12,6 +12,8 @@ #include <vespa/log/log.h> LOG_SETUP("sequencedtaskexecutor_test"); +VESPA_THREAD_STACK_TAG(sequenced_executor) + namespace vespalib { @@ -20,7 +22,7 @@ class Fixture public: std::unique_ptr<ISequencedTaskExecutor> _threads; - Fixture() : _threads(SequencedTaskExecutor::create(2)) { } + Fixture() : _threads(SequencedTaskExecutor::create(sequenced_executor, 2)) { } }; @@ -233,12 +235,12 @@ TEST_F("require that executeLambda works", Fixture) } TEST("require that you get correct number of executors") { - auto seven = SequencedTaskExecutor::create(7); + auto seven = SequencedTaskExecutor::create(sequenced_executor, 7); EXPECT_EQUAL(7u, seven->getNumExecutors()); } TEST("require that you distribute well") { - auto seven = SequencedTaskExecutor::create(7); + auto seven = SequencedTaskExecutor::create(sequenced_executor, 7); const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven); EXPECT_EQUAL(7u, seven->getNumExecutors()); EXPECT_EQUAL(97u, seq.getComponentHashSize()); @@ -251,21 +253,21 @@ TEST("require that you distribute well") { } TEST("Test creation of different types") { - auto iseq = SequencedTaskExecutor::create(1); + auto iseq = SequencedTaskExecutor::create(sequenced_executor, 1); EXPECT_EQUAL(1u, iseq->getNumExecutors()); auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::LATENCY); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::LATENCY); seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::THROUGHPUT); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::THROUGHPUT); seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::ADAPTIVE, 17); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::ADAPTIVE, 17); auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get()); ASSERT_TRUE(aseq != nullptr); } diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index 622c9b9985f..5cc8862fc05 100644 --- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -8,10 +8,12 @@ using namespace vespalib; +VESPA_THREAD_STACK_TAG(sequenced_executor) + TEST("test that all tasks are executed") { std::atomic<uint64_t> counter(0); - SingleExecutor executor(10); + SingleExecutor executor(sequenced_executor, 10); for (uint64_t i(0); i < 10; i++) { executor.execute(makeLambdaTask([&counter] {counter++;})); @@ -32,7 +34,7 @@ void verifyResizeTaskLimit(bool up) { std::condition_variable cond; std::atomic<uint64_t> started(0); std::atomic<uint64_t> allowed(0); - SingleExecutor executor(10); + SingleExecutor executor(sequenced_executor, 10); uint32_t targetTaskLimit = up ? 20 : 5; uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index cf385275bfb..d1c6b1aba53 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -27,7 +27,8 @@ isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & ex } std::unique_ptr<ISequencedTaskExecutor> -SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) +SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, + OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) { if (optimize == OptimizeFor::ADAPTIVE) { size_t num_strands = std::min(taskLimit, threads*32); @@ -38,9 +39,9 @@ SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 10 : kindOfWatermark; - executors->push_back(std::make_unique<SingleExecutor>(taskLimit, watermark, reactionTime)); + executors->push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime)); } else { - executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit)); + executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); } } return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 180cd1cc6cc..050b00ef011 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -3,6 +3,7 @@ #include "isequencedtaskexecutor.h" #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/runnable.h> namespace vespalib { @@ -33,7 +34,8 @@ public: * */ static std::unique_ptr<ISequencedTaskExecutor> - create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms); + create(vespalib::Runnable::init_fun_t, uint32_t threads, uint32_t taskLimit = 1000, + OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms); /** * For testing only */ diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 795a7ef1ec3..96d8f267875 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -5,11 +5,11 @@ namespace vespalib { -SingleExecutor::SingleExecutor(uint32_t taskLimit) - : SingleExecutor(taskLimit, taskLimit/10, 5ms) +SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit) + : SingleExecutor(func, taskLimit, taskLimit/10, 5ms) { } -SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime) +SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime) : _taskLimit(vespalib::roundUp2inN(taskLimit)), _wantedTaskLimit(_taskLimit.load()), _rp(0), @@ -27,6 +27,7 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration _reactionTime(reactionTime), _closed(false) { + (void) func; //TODO implement similar to ThreadStackExecutor assert(taskLimit >= watermark); _thread.start(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 7b8a2741d87..58cec52b2b0 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -18,8 +18,8 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - explicit SingleExecutor(uint32_t taskLimit); - SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime); + explicit SingleExecutor(init_fun_t func, uint32_t taskLimit); + SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; |