aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHaavard <havardpe@yahoo-inc.com>2017-06-01 12:18:32 +0000
committerHaavard <havardpe@yahoo-inc.com>2017-06-01 12:18:32 +0000
commit7d24996180577083beb0335608b1cdf34e2ded41 (patch)
tree5c8ad9b7b3c23265a0f6d6c7f5d684b41f23216b /vespalib
parenta9772aff3ee6ce7b1f76322b0617890545289592 (diff)
added worker init wrapper support for executors
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp29
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp29
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp13
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h6
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp14
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutor.h7
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp16
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h33
8 files changed, 134 insertions, 13 deletions
diff --git a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
index f8136fe2a10..6b295497744 100644
--- a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/sync.h>
+#include <vespa/vespalib/util/backtrace.h>
#include <thread>
using namespace vespalib;
@@ -108,4 +109,32 @@ TEST_F("require that task limit can be decreased", Fixture(3, 3))
f.blockedExecuteAndWaitUntilFinished();
}
+vespalib::string get_worker_stack_trace(BlockingThreadStackExecutor &executor) {
+ struct StackTraceTask : public Executor::Task {
+ vespalib::string &trace;
+ explicit StackTraceTask(vespalib::string &t) : trace(t) {}
+ void run() override { trace = getStackTrace(0); }
+ };
+ vespalib::string trace;
+ executor.execute(std::make_unique<StackTraceTask>(trace));
+ executor.sync();
+ return trace;
+}
+
+VESPA_THREAD_STACK_TAG(my_stack_tag);
+
+TEST_F("require that executor has appropriate default thread stack tag", BlockingThreadStackExecutor(1, 128*1024, 10)) {
+ vespalib::string trace = get_worker_stack_trace(f1);
+ if (!EXPECT_TRUE(trace.find("unnamed_blocking_executor") != vespalib::string::npos)) {
+ fprintf(stderr, "%s\n", trace.c_str());
+ }
+}
+
+TEST_F("require that executor thread stack tag can be set", BlockingThreadStackExecutor(1, 128*1024, 10, my_stack_tag)) {
+ vespalib::string trace = get_worker_stack_trace(f1);
+ if (!EXPECT_TRUE(trace.find("my_stack_tag") != vespalib::string::npos)) {
+ fprintf(stderr, "%s\n", trace.c_str());
+ }
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index cb998a4323c..143a45c5f3f 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/sync.h>
+#include <vespa/vespalib/util/backtrace.h>
using namespace vespalib;
@@ -157,4 +158,32 @@ TEST_MT_F("require that threads can wait for a specific task count", 7, WaitStat
}
}
+vespalib::string get_worker_stack_trace(ThreadStackExecutor &executor) {
+ struct StackTraceTask : public Executor::Task {
+ vespalib::string &trace;
+ explicit StackTraceTask(vespalib::string &t) : trace(t) {}
+ void run() override { trace = getStackTrace(0); }
+ };
+ vespalib::string trace;
+ executor.execute(std::make_unique<StackTraceTask>(trace));
+ executor.sync();
+ return trace;
+}
+
+VESPA_THREAD_STACK_TAG(my_stack_tag);
+
+TEST_F("require that executor has appropriate default thread stack tag", ThreadStackExecutor(1, 128*1024)) {
+ vespalib::string trace = get_worker_stack_trace(f1);
+ if (!EXPECT_TRUE(trace.find("unnamed_nonblocking_executor") != vespalib::string::npos)) {
+ fprintf(stderr, "%s\n", trace.c_str());
+ }
+}
+
+TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor(1, 128*1024, my_stack_tag)) {
+ vespalib::string trace = get_worker_stack_trace(f1);
+ if (!EXPECT_TRUE(trace.find("my_stack_tag") != vespalib::string::npos)) {
+ fprintf(stderr, "%s\n", trace.c_str());
+ }
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
index bb6214b6011..f52c534498f 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
@@ -4,6 +4,8 @@
namespace vespalib {
+VESPA_THREAD_STACK_TAG(unnamed_blocking_executor);
+
bool
BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard)
{
@@ -19,8 +21,15 @@ BlockingThreadStackExecutor::wakeup(MonitorGuard & monitor)
monitor.broadcast();
}
-BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) :
- ThreadStackExecutorBase(stackSize, taskLimit)
+BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit)
+ : ThreadStackExecutorBase(stackSize, taskLimit, unnamed_blocking_executor)
+{
+ start(threads);
+}
+
+BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit,
+ init_fun_t init_function)
+ : ThreadStackExecutorBase(stackSize, taskLimit, std::move(init_function))
{
start(threads);
}
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
index 1274ed7fcea..13d8eece40e 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
@@ -27,6 +27,12 @@ public:
* @param taskLimit upper limit on accepted tasks
**/
BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit);
+
+ // same as above, but enables you to specify a custom function
+ // used to wrap the main loop of all worker threads
+ BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit,
+ init_fun_t init_function);
+
~BlockingThreadStackExecutor();
/**
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
index 10fbd129ce6..aff52968c7c 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
@@ -4,6 +4,8 @@
namespace vespalib {
+VESPA_THREAD_STACK_TAG(unnamed_nonblocking_executor);
+
bool
ThreadStackExecutor::acceptNewTask(MonitorGuard &)
{
@@ -15,8 +17,16 @@ ThreadStackExecutor::wakeup(MonitorGuard &)
{
}
-ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) :
- ThreadStackExecutorBase(stackSize, taskLimit)
+ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize,
+ uint32_t taskLimit)
+ : ThreadStackExecutorBase(stackSize, taskLimit, unnamed_nonblocking_executor)
+{
+ start(threads);
+}
+
+ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize,
+ init_fun_t init_function, uint32_t taskLimit)
+ : ThreadStackExecutorBase(stackSize, taskLimit, std::move(init_function))
{
start(threads);
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
index 27d8c3da325..3ac081be05f 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
@@ -1,5 +1,4 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-// Copyright (C) 2010 Yahoo
#pragma once
@@ -30,6 +29,12 @@ public:
ThreadStackExecutor(uint32_t threads, uint32_t stackSize,
uint32_t taskLimit = 0xffffffff);
+ // same as above, but enables you to specify a custom function
+ // used to wrap the main loop of all worker threads
+ ThreadStackExecutor(uint32_t threads, uint32_t stackSize,
+ init_fun_t init_function,
+ uint32_t taskLimit = 0xffffffff);
+
/**
* Will invoke cleanup.
**/
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index 28f9a57b3c5..263ff528ebd 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -5,6 +5,12 @@
namespace vespalib {
void
+ThreadStackExecutorBase::ThreadInit::Run(FastOS_ThreadInterface *, void *)
+{
+ init_fun(worker);
+}
+
+void
ThreadStackExecutorBase::BlockedThread::wait() const
{
MonitorGuard guard(monitor);
@@ -91,7 +97,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
}
void
-ThreadStackExecutorBase::Run(FastOS_ThreadInterface *, void *)
+ThreadStackExecutorBase::run()
{
Worker worker;
worker.verify(/* idle: */ true);
@@ -107,7 +113,8 @@ ThreadStackExecutorBase::Run(FastOS_ThreadInterface *, void *)
//-----------------------------------------------------------------------------
ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
- uint32_t taskLimit)
+ uint32_t taskLimit,
+ init_fun_t init_fun)
: _pool(stackSize),
_monitor(),
_stats(),
@@ -117,7 +124,8 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
_barrier(),
_taskCount(0),
_taskLimit(taskLimit),
- _closed(false)
+ _closed(false),
+ _thread_init(std::make_unique<ThreadInit>(*this, std::move(init_fun)))
{
assert(taskLimit > 0);
}
@@ -127,7 +135,7 @@ ThreadStackExecutorBase::start(uint32_t threads)
{
assert(threads > 0);
for (uint32_t i = 0; i < threads; ++i) {
- FastOS_ThreadInterface *thread = _pool.NewThread(this);
+ FastOS_ThreadInterface *thread = _pool.NewThread(_thread_init.get());
assert(thread != 0);
(void)thread;
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index c2f5742009f..ee66ad71a38 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -7,17 +7,28 @@
#include "eventbarrier.hpp"
#include "arrayqueue.hpp"
#include "sync.h"
+#include "runnable.h"
#include <memory>
#include <vector>
#include <vespa/fastos/thread.h>
namespace vespalib {
+// Convenience macro used to create a function that can be used as an
+// init function when creating an executor to inject a frame with the
+// given name into the stack of all worker threads.
+
+#define VESPA_THREAD_STACK_TAG(name) \
+ int name(Runnable &worker) { \
+ worker.run(); \
+ return 1; \
+ }
+
/**
* An executor service that executes tasks in multiple threads.
**/
class ThreadStackExecutorBase : public ThreadExecutor,
- public FastOS_Runnable
+ public Runnable
{
public:
/**
@@ -31,7 +42,17 @@ public:
Stats() : maxPendingTasks(0), acceptedTasks(0), rejectedTasks(0) {}
};
+ using init_fun_t = std::function<int(Runnable&)>;
+
private:
+ struct ThreadInit : public FastOS_Runnable {
+ Runnable &worker;
+ init_fun_t init_fun;
+ explicit ThreadInit(Runnable &worker_in, init_fun_t init_fun_in)
+ : worker(worker_in), init_fun(std::move(init_fun_in)) {}
+ void Run(FastOS_ThreadInterface *, void *) override;
+ };
+
struct TaggedTask {
Task::UP task;
uint32_t token;
@@ -91,6 +112,7 @@ private:
uint32_t _taskCount;
uint32_t _taskLimit;
bool _closed;
+ std::unique_ptr<ThreadInit> _thread_init;
void block_thread(const LockGuard &, BlockedThread &blocked_thread);
void unblock_threads(const MonitorGuard &);
@@ -114,8 +136,8 @@ private:
**/
bool obtainTask(Worker &worker);
- // from FastOS_Runnable (all workers live here)
- void Run(FastOS_ThreadInterface *, void *) override;
+ // Runnable (all workers live here)
+ void run() override;
protected:
/**
@@ -155,8 +177,11 @@ protected:
*
* @param stackSize stack size per worker thread
* @param taskLimit upper limit on accepted tasks
+ * @param init_fun custom function used to wrap the main loop of
+ * each worker thread.
**/
- ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit);
+ ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit,
+ init_fun_t init_fun);
/**
* This will start the theads. This is to avoid starting tasks in