diff options
9 files changed, 138 insertions, 14 deletions
diff --git a/searchlib/src/vespa/searchlib/engine/transportserver.cpp b/searchlib/src/vespa/searchlib/engine/transportserver.cpp index 5462be8039e..4e46035b687 100644 --- a/searchlib/src/vespa/searchlib/engine/transportserver.cpp +++ b/searchlib/src/vespa/searchlib/engine/transportserver.cpp @@ -382,12 +382,15 @@ TransportServer::TransportServer(SearchServer &searchServer, bool TransportServer::start() { + if (!updateListen()) { + return false; + } if (_threadPool.NewThread(this) == 0) { LOG(error, "Could not start internal transport thread"); _failed = true; return false; } - return updateListen(); + return true; } int diff --git a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp index f8136fe2a10..6b295497744 100644 --- a/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/blockingthreadstackexecutor_test.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/blockingthreadstackexecutor.h> #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/util/backtrace.h> #include <thread> using namespace vespalib; @@ -108,4 +109,32 @@ TEST_F("require that task limit can be decreased", Fixture(3, 3)) f.blockedExecuteAndWaitUntilFinished(); } +vespalib::string get_worker_stack_trace(BlockingThreadStackExecutor &executor) { + struct StackTraceTask : public Executor::Task { + vespalib::string &trace; + explicit StackTraceTask(vespalib::string &t) : trace(t) {} + void run() override { trace = getStackTrace(0); } + }; + vespalib::string trace; + executor.execute(std::make_unique<StackTraceTask>(trace)); + executor.sync(); + return trace; +} + +VESPA_THREAD_STACK_TAG(my_stack_tag); + +TEST_F("require that executor has appropriate default thread stack tag", BlockingThreadStackExecutor(1, 128*1024, 10)) { + vespalib::string trace = get_worker_stack_trace(f1); + if (!EXPECT_TRUE(trace.find("unnamed_blocking_executor") != vespalib::string::npos)) { + fprintf(stderr, "%s\n", trace.c_str()); + } +} + +TEST_F("require that executor thread stack tag can be set", BlockingThreadStackExecutor(1, 128*1024, 10, my_stack_tag)) { + vespalib::string trace = get_worker_stack_trace(f1); + if (!EXPECT_TRUE(trace.find("my_stack_tag") != vespalib::string::npos)) { + fprintf(stderr, "%s\n", trace.c_str()); + } +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index cb998a4323c..143a45c5f3f 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/sync.h> +#include <vespa/vespalib/util/backtrace.h> using namespace vespalib; @@ -157,4 +158,32 @@ TEST_MT_F("require that threads can wait for a specific task count", 7, WaitStat } } +vespalib::string get_worker_stack_trace(ThreadStackExecutor &executor) { + struct StackTraceTask : public Executor::Task { + vespalib::string &trace; + explicit StackTraceTask(vespalib::string &t) : trace(t) {} + void run() override { trace = getStackTrace(0); } + }; + vespalib::string trace; + executor.execute(std::make_unique<StackTraceTask>(trace)); + executor.sync(); + return trace; +} + +VESPA_THREAD_STACK_TAG(my_stack_tag); + +TEST_F("require that executor has appropriate default thread stack tag", ThreadStackExecutor(1, 128*1024)) { + vespalib::string trace = get_worker_stack_trace(f1); + if (!EXPECT_TRUE(trace.find("unnamed_nonblocking_executor") != vespalib::string::npos)) { + fprintf(stderr, "%s\n", trace.c_str()); + } +} + +TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor(1, 128*1024, my_stack_tag)) { + vespalib::string trace = get_worker_stack_trace(f1); + if (!EXPECT_TRUE(trace.find("my_stack_tag") != vespalib::string::npos)) { + fprintf(stderr, "%s\n", trace.c_str()); + } +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp index bb6214b6011..f52c534498f 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp @@ -4,6 +4,8 @@ namespace vespalib { +VESPA_THREAD_STACK_TAG(unnamed_blocking_executor); + bool BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard) { @@ -19,8 +21,15 @@ BlockingThreadStackExecutor::wakeup(MonitorGuard & monitor) monitor.broadcast(); } -BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) : - ThreadStackExecutorBase(stackSize, taskLimit) +BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) + : ThreadStackExecutorBase(stackSize, taskLimit, unnamed_blocking_executor) +{ + start(threads); +} + +BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit, + init_fun_t init_function) + : ThreadStackExecutorBase(stackSize, taskLimit, std::move(init_function)) { start(threads); } diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h index 1274ed7fcea..13d8eece40e 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h @@ -27,6 +27,12 @@ public: * @param taskLimit upper limit on accepted tasks **/ BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit); + + // same as above, but enables you to specify a custom function + // used to wrap the main loop of all worker threads + BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit, + init_fun_t init_function); + ~BlockingThreadStackExecutor(); /** diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp index 10fbd129ce6..aff52968c7c 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp @@ -4,6 +4,8 @@ namespace vespalib { +VESPA_THREAD_STACK_TAG(unnamed_nonblocking_executor); + bool ThreadStackExecutor::acceptNewTask(MonitorGuard &) { @@ -15,8 +17,16 @@ ThreadStackExecutor::wakeup(MonitorGuard &) { } -ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) : - ThreadStackExecutorBase(stackSize, taskLimit) +ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize, + uint32_t taskLimit) + : ThreadStackExecutorBase(stackSize, taskLimit, unnamed_nonblocking_executor) +{ + start(threads); +} + +ThreadStackExecutor::ThreadStackExecutor(uint32_t threads, uint32_t stackSize, + init_fun_t init_function, uint32_t taskLimit) + : ThreadStackExecutorBase(stackSize, taskLimit, std::move(init_function)) { start(threads); } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h index 27d8c3da325..3ac081be05f 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h @@ -1,5 +1,4 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -// Copyright (C) 2010 Yahoo #pragma once @@ -30,6 +29,12 @@ public: ThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit = 0xffffffff); + // same as above, but enables you to specify a custom function + // used to wrap the main loop of all worker threads + ThreadStackExecutor(uint32_t threads, uint32_t stackSize, + init_fun_t init_function, + uint32_t taskLimit = 0xffffffff); + /** * Will invoke cleanup. **/ diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 28f9a57b3c5..263ff528ebd 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -5,6 +5,12 @@ namespace vespalib { void +ThreadStackExecutorBase::ThreadInit::Run(FastOS_ThreadInterface *, void *) +{ + init_fun(worker); +} + +void ThreadStackExecutorBase::BlockedThread::wait() const { MonitorGuard guard(monitor); @@ -91,7 +97,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker) } void -ThreadStackExecutorBase::Run(FastOS_ThreadInterface *, void *) +ThreadStackExecutorBase::run() { Worker worker; worker.verify(/* idle: */ true); @@ -107,7 +113,8 @@ ThreadStackExecutorBase::Run(FastOS_ThreadInterface *, void *) //----------------------------------------------------------------------------- ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, - uint32_t taskLimit) + uint32_t taskLimit, + init_fun_t init_fun) : _pool(stackSize), _monitor(), _stats(), @@ -117,7 +124,8 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, _barrier(), _taskCount(0), _taskLimit(taskLimit), - _closed(false) + _closed(false), + _thread_init(std::make_unique<ThreadInit>(*this, std::move(init_fun))) { assert(taskLimit > 0); } @@ -127,7 +135,7 @@ ThreadStackExecutorBase::start(uint32_t threads) { assert(threads > 0); for (uint32_t i = 0; i < threads; ++i) { - FastOS_ThreadInterface *thread = _pool.NewThread(this); + FastOS_ThreadInterface *thread = _pool.NewThread(_thread_init.get()); assert(thread != 0); (void)thread; } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index c2f5742009f..ee66ad71a38 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -7,17 +7,28 @@ #include "eventbarrier.hpp" #include "arrayqueue.hpp" #include "sync.h" +#include "runnable.h" #include <memory> #include <vector> #include <vespa/fastos/thread.h> namespace vespalib { +// Convenience macro used to create a function that can be used as an +// init function when creating an executor to inject a frame with the +// given name into the stack of all worker threads. + +#define VESPA_THREAD_STACK_TAG(name) \ + int name(Runnable &worker) { \ + worker.run(); \ + return 1; \ + } + /** * An executor service that executes tasks in multiple threads. **/ class ThreadStackExecutorBase : public ThreadExecutor, - public FastOS_Runnable + public Runnable { public: /** @@ -31,7 +42,17 @@ public: Stats() : maxPendingTasks(0), acceptedTasks(0), rejectedTasks(0) {} }; + using init_fun_t = std::function<int(Runnable&)>; + private: + struct ThreadInit : public FastOS_Runnable { + Runnable &worker; + init_fun_t init_fun; + explicit ThreadInit(Runnable &worker_in, init_fun_t init_fun_in) + : worker(worker_in), init_fun(std::move(init_fun_in)) {} + void Run(FastOS_ThreadInterface *, void *) override; + }; + struct TaggedTask { Task::UP task; uint32_t token; @@ -91,6 +112,7 @@ private: uint32_t _taskCount; uint32_t _taskLimit; bool _closed; + std::unique_ptr<ThreadInit> _thread_init; void block_thread(const LockGuard &, BlockedThread &blocked_thread); void unblock_threads(const MonitorGuard &); @@ -114,8 +136,8 @@ private: **/ bool obtainTask(Worker &worker); - // from FastOS_Runnable (all workers live here) - void Run(FastOS_ThreadInterface *, void *) override; + // Runnable (all workers live here) + void run() override; protected: /** @@ -155,8 +177,11 @@ protected: * * @param stackSize stack size per worker thread * @param taskLimit upper limit on accepted tasks + * @param init_fun custom function used to wrap the main loop of + * each worker thread. **/ - ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit); + ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit, + init_fun_t init_fun); /** * This will start the theads. This is to avoid starting tasks in |