summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-11-12 09:26:11 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-11-12 09:26:11 +0000
commit434fca0b458910329f63da49b9b1c84de232bf3f (patch)
tree6060535c3c2b13e40b37867b21230fdbdc7c80ec /staging_vespalib
parentde23b574462e6931e6afd0906257f0bd7673f1f8 (diff)
Name the threads so it is easier to see who is doing what.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp4
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp16
-rw-r--r--staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp6
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h4
7 files changed, 29 insertions, 19 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
index ba82651f1fc..90067d86fc8 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -40,6 +40,8 @@ struct SimpleParams {
}
};
+VESPA_THREAD_STACK_TAG(sequenced_executor)
+
int main(int argc, char **argv) {
SimpleParams params(argc, argv);
bool use_adaptive_executor = params.next("use_adaptive_executor", 0);
@@ -58,7 +60,7 @@ int main(int argc, char **argv) {
auto optimize = optimize_for_throughput
? vespalib::Executor::OptimizeFor::THROUGHPUT
: vespalib::Executor::OptimizeFor::LATENCY;
- executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize);
+ executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, optimize);
}
vespalib::Timer timer;
for (size_t task_id = 0; task_id < num_tasks; ++task_id) {
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 29b25cd0471..21674b4e2d0 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -12,6 +12,8 @@
#include <vespa/log/log.h>
LOG_SETUP("sequencedtaskexecutor_test");
+VESPA_THREAD_STACK_TAG(sequenced_executor)
+
namespace vespalib {
@@ -20,7 +22,7 @@ class Fixture
public:
std::unique_ptr<ISequencedTaskExecutor> _threads;
- Fixture() : _threads(SequencedTaskExecutor::create(2)) { }
+ Fixture() : _threads(SequencedTaskExecutor::create(sequenced_executor, 2)) { }
};
@@ -233,12 +235,12 @@ TEST_F("require that executeLambda works", Fixture)
}
TEST("require that you get correct number of executors") {
- auto seven = SequencedTaskExecutor::create(7);
+ auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
EXPECT_EQUAL(7u, seven->getNumExecutors());
}
TEST("require that you distribute well") {
- auto seven = SequencedTaskExecutor::create(7);
+ auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven);
EXPECT_EQUAL(7u, seven->getNumExecutors());
EXPECT_EQUAL(97u, seq.getComponentHashSize());
@@ -251,21 +253,21 @@ TEST("require that you distribute well") {
}
TEST("Test creation of different types") {
- auto iseq = SequencedTaskExecutor::create(1);
+ auto iseq = SequencedTaskExecutor::create(sequenced_executor, 1);
EXPECT_EQUAL(1u, iseq->getNumExecutors());
auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::LATENCY);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::LATENCY);
seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::THROUGHPUT);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::THROUGHPUT);
seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::ADAPTIVE, 17);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::ADAPTIVE, 17);
auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get());
ASSERT_TRUE(aseq != nullptr);
}
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
index 622c9b9985f..5cc8862fc05 100644
--- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
+++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -8,10 +8,12 @@
using namespace vespalib;
+VESPA_THREAD_STACK_TAG(sequenced_executor)
+
TEST("test that all tasks are executed") {
std::atomic<uint64_t> counter(0);
- SingleExecutor executor(10);
+ SingleExecutor executor(sequenced_executor, 10);
for (uint64_t i(0); i < 10; i++) {
executor.execute(makeLambdaTask([&counter] {counter++;}));
@@ -32,7 +34,7 @@ void verifyResizeTaskLimit(bool up) {
std::condition_variable cond;
std::atomic<uint64_t> started(0);
std::atomic<uint64_t> allowed(0);
- SingleExecutor executor(10);
+ SingleExecutor executor(sequenced_executor, 10);
uint32_t targetTaskLimit = up ? 20 : 5;
uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit);
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index cf385275bfb..d1c6b1aba53 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -27,7 +27,8 @@ isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & ex
}
std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
+SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
+ OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
{
if (optimize == OptimizeFor::ADAPTIVE) {
size_t num_strands = std::min(taskLimit, threads*32);
@@ -38,9 +39,9 @@ SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 10 : kindOfWatermark;
- executors->push_back(std::make_unique<SingleExecutor>(taskLimit, watermark, reactionTime));
+ executors->push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
} else {
- executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
+ executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
}
return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 180cd1cc6cc..050b00ef011 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -3,6 +3,7 @@
#include "isequencedtaskexecutor.h"
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/runnable.h>
namespace vespalib {
@@ -33,7 +34,8 @@ public:
*
*/
static std::unique_ptr<ISequencedTaskExecutor>
- create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms);
+ create(vespalib::Runnable::init_fun_t, uint32_t threads, uint32_t taskLimit = 1000,
+ OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms);
/**
* For testing only
*/
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 795a7ef1ec3..96d8f267875 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -5,11 +5,11 @@
namespace vespalib {
-SingleExecutor::SingleExecutor(uint32_t taskLimit)
- : SingleExecutor(taskLimit, taskLimit/10, 5ms)
+SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit)
+ : SingleExecutor(func, taskLimit, taskLimit/10, 5ms)
{ }
-SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime)
+SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime)
: _taskLimit(vespalib::roundUp2inN(taskLimit)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
@@ -27,6 +27,7 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration
_reactionTime(reactionTime),
_closed(false)
{
+ (void) func; //TODO implement similar to ThreadStackExecutor
assert(taskLimit >= watermark);
_thread.start();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 7b8a2741d87..58cec52b2b0 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -18,8 +18,8 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- explicit SingleExecutor(uint32_t taskLimit);
- SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime);
+ explicit SingleExecutor(init_fun_t func, uint32_t taskLimit);
+ SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;