diff options
19 files changed, 82 insertions, 43 deletions
diff --git a/config/src/vespa/config/helper/configfetcher.cpp b/config/src/vespa/config/helper/configfetcher.cpp index 7a6f806c6ff..0ac3492c97f 100644 --- a/config/src/vespa/config/helper/configfetcher.cpp +++ b/config/src/vespa/config/helper/configfetcher.cpp @@ -9,9 +9,12 @@ LOG_SETUP(".config.helper.configfetcher"); namespace config { -ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context) +VESPA_THREAD_STACK_TAG(config_fetcher_executor); + + + ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context) : _poller(context), - _thread(std::make_unique<vespalib::Thread>(_poller)), + _thread(std::make_unique<vespalib::Thread>(_poller, config_fetcher_executor)), _closed(false), _started(false) { @@ -19,7 +22,7 @@ ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context) ConfigFetcher::ConfigFetcher(const SourceSpec & spec) : _poller(std::make_shared<ConfigContext>(spec)), - _thread(std::make_unique<vespalib::Thread>(_poller)), + _thread(std::make_unique<vespalib::Thread>(_poller, config_fetcher_executor)), _closed(false), _started(false) { diff --git a/config/src/vespa/config/retriever/simpleconfigurer.cpp b/config/src/vespa/config/retriever/simpleconfigurer.cpp index 74022cfd6a3..9d6a6f752c3 100644 --- a/config/src/vespa/config/retriever/simpleconfigurer.cpp +++ b/config/src/vespa/config/retriever/simpleconfigurer.cpp @@ -8,10 +8,12 @@ LOG_SETUP(".config.retriever.simpleconfigurer"); namespace config { +VESPA_THREAD_STACK_TAG(simple_configurer_executor); + SimpleConfigurer::SimpleConfigurer(SimpleConfigRetriever::UP retriever, SimpleConfigurable * const configurable) : _retriever(std::move(retriever)), _configurable(configurable), - _thread(*this), + _thread(*this, simple_configurer_executor), _started(false) { assert(_retriever); diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp index 3474a4297c7..516c31cb232 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp @@ -30,6 +30,10 @@ vespalib::string doc1("id:test:music::1"); } +TEST("control sizeof(PendingGidToLidChange)") { + EXPECT_EQUAL(48u, sizeof(PendingGidToLidChange)); +} + class ListenerStats { using lock_guard = std::lock_guard<std::mutex>; std::mutex _lock; diff --git a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp index b962ecf611e..4223bedcf3f 100644 --- a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp +++ b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp @@ -7,16 +7,18 @@ LOG_SETUP(".slobrok.server"); namespace slobrok { +VESPA_THREAD_STACK_TAG(slobrok_server_executor); + SlobrokServer::SlobrokServer(ConfigShim &shim) : _env(shim), - _thread(*this) + _thread(*this, slobrok_server_executor) { _thread.start(); } SlobrokServer::SlobrokServer(uint32_t port) : _env(ConfigShim(port)), - _thread(*this) + _thread(*this, slobrok_server_executor) { _thread.start(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 99791c36a9e..a99bce0a705 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -19,7 +19,7 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _mutex(), _consumerCondition(), _producerCondition(), - _thread(*this), + _thread(*this, func), _idleTracker(steady_clock::now()), _threadIdleTracker(), _wakeupCount(0), @@ -32,7 +32,6 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _reactionTime(reactionTime), _closed(false) { - (void) func; //TODO implement similar to ThreadStackExecutor assert(taskLimit >= watermark); _thread.start(); } diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index a3f0182ba30..ffd7d804663 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -22,6 +22,8 @@ namespace storage { namespace { +VESPA_THREAD_STACK_TAG(test_executor); + // Exploit the fact that PersistenceProviderWrapper already provides a forwarding // implementation of all SPI calls, so we can selectively override. class BlockingMockProvider : public PersistenceProviderWrapper @@ -294,7 +296,7 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_ auto abortCmd = makeAbortCmd(abortSet); SendTask sendTask(abortCmd, *_queueBarrier, c.top); - vespalib::Thread thread(sendTask); + vespalib::Thread thread(sendTask, test_executor); thread.start(); LOG(debug, "waiting for threads to reach barriers"); diff --git a/vbench/src/apps/vbench/vbench.cpp b/vbench/src/apps/vbench/vbench.cpp index 00499519dcc..d37ed7688de 100644 --- a/vbench/src/apps/vbench/vbench.cpp +++ b/vbench/src/apps/vbench/vbench.cpp @@ -8,6 +8,8 @@ using namespace vbench; +VESPA_THREAD_STACK_TAG(vbench_executor); + typedef vespalib::SignalHandler SIG; struct NotifyDone : public vespalib::Runnable { @@ -31,8 +33,7 @@ int run(const std::string &cfg_name) { return 1; } vespalib::Slime cfg; - vespalib::Memory mapped_cfg(cfg_file.get().data, - cfg_file.get().size); + vespalib::Memory mapped_cfg(cfg_file.get().data, cfg_file.get().size); if (!vespalib::slime::JsonFormat::decode(mapped_cfg, cfg)) { fprintf(stderr, "unable to parse config file: %s\n", cfg.toString().c_str()); @@ -43,7 +44,7 @@ int run(const std::string &cfg_name) { VBench vbench(cfg); NotifyDone notify(done); vespalib::RunnablePair runBoth(vbench, notify); - vespalib::Thread thread(runBoth); + vespalib::Thread thread(runBoth, vbench_executor); thread.start(); while (!SIG::INT.check() && !SIG::TERM.check() && !done.await(1s)) {} if (!done.await(vespalib::duration::zero())) { diff --git a/vbench/src/tests/dispatcher/dispatcher_test.cpp b/vbench/src/tests/dispatcher/dispatcher_test.cpp index b2c002e3e50..99b57669120 100644 --- a/vbench/src/tests/dispatcher/dispatcher_test.cpp +++ b/vbench/src/tests/dispatcher/dispatcher_test.cpp @@ -17,6 +17,9 @@ struct Fetcher : public vespalib::Runnable { void run() override { handler.handle(provider.provide()); } }; +VESPA_THREAD_STACK_TAG(fetcher1_executor); +VESPA_THREAD_STACK_TAG(fetcher2_executor); + TEST("dispatcher") { MyHandler dropped; MyHandler handler1; @@ -24,8 +27,8 @@ TEST("dispatcher") { Dispatcher<int> dispatcher(dropped); Fetcher fetcher1(dispatcher, handler1); Fetcher fetcher2(dispatcher, handler2); - vespalib::Thread thread1(fetcher1); - vespalib::Thread thread2(fetcher2); + vespalib::Thread thread1(fetcher1, fetcher1_executor); + vespalib::Thread thread2(fetcher2, fetcher2_executor); thread1.start(); EXPECT_TRUE(dispatcher.waitForThreads(1, 512)); thread2.start(); diff --git a/vbench/src/tests/handler_thread/handler_thread_test.cpp b/vbench/src/tests/handler_thread/handler_thread_test.cpp index fd7d630f705..6b5dbedd0ac 100644 --- a/vbench/src/tests/handler_thread/handler_thread_test.cpp +++ b/vbench/src/tests/handler_thread/handler_thread_test.cpp @@ -15,9 +15,11 @@ struct MyHandler : Handler<int> { MyHandler::~MyHandler() = default; +VESPA_THREAD_STACK_TAG(test_executor); + TEST("handler thread") { MyHandler handler; - HandlerThread<int> th(handler); + HandlerThread<int> th(handler, test_executor); th.handle(std::unique_ptr<int>(new int(1))); th.handle(std::unique_ptr<int>(new int(2))); th.handle(std::unique_ptr<int>(new int(3))); diff --git a/vbench/src/vbench/core/handler_thread.h b/vbench/src/vbench/core/handler_thread.h index b4aaf08eee8..402ecbeb0dc 100644 --- a/vbench/src/vbench/core/handler_thread.h +++ b/vbench/src/vbench/core/handler_thread.h @@ -33,7 +33,7 @@ private: void run() override; public: - HandlerThread(Handler<T> &next); + HandlerThread(Handler<T> &next, init_fun_t init_fun); ~HandlerThread(); void handle(std::unique_ptr<T> obj) override; void join() override; diff --git a/vbench/src/vbench/core/handler_thread.hpp b/vbench/src/vbench/core/handler_thread.hpp index 3d1dc423411..56cc0a7771d 100644 --- a/vbench/src/vbench/core/handler_thread.hpp +++ b/vbench/src/vbench/core/handler_thread.hpp @@ -23,12 +23,12 @@ HandlerThread<T>::run() } template <typename T> -HandlerThread<T>::HandlerThread(Handler<T> &next) +HandlerThread<T>::HandlerThread(Handler<T> &next, init_fun_t init_fun) : _lock(), _cond(), _queue(), _next(next), - _thread(*this), + _thread(*this, init_fun), _done(false) { _thread.start(); diff --git a/vbench/src/vbench/vbench/request_scheduler.cpp b/vbench/src/vbench/vbench/request_scheduler.cpp index 80aec6c308e..320ecc91fc6 100644 --- a/vbench/src/vbench/vbench/request_scheduler.cpp +++ b/vbench/src/vbench/vbench/request_scheduler.cpp @@ -1,11 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "request_scheduler.h" - #include <vbench/core/timer.h> namespace vbench { +VESPA_THREAD_STACK_TAG(vbench_request_scheduler_executor); +VESPA_THREAD_STACK_TAG(vbench_handler_executor); + void RequestScheduler::run() { @@ -24,11 +26,11 @@ RequestScheduler::run() RequestScheduler::RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &next, size_t numWorkers) : _timer(), - _proxy(next), + _proxy(next, vbench_handler_executor), _queue(10.0, 0.020), _droppedTagger(_proxy), _dispatcher(_droppedTagger), - _thread(*this), + _thread(*this, vbench_request_scheduler_executor), _connectionPool(std::move(crypto), _timer), _workers() { diff --git a/vbench/src/vbench/vbench/vbench.cpp b/vbench/src/vbench/vbench/vbench.cpp index d636f7a1cd7..9a5adad262e 100644 --- a/vbench/src/vbench/vbench/vbench.cpp +++ b/vbench/src/vbench/vbench/vbench.cpp @@ -40,6 +40,8 @@ CryptoEngine::SP setup_crypto(const vespalib::slime::Inspector &tls) { } // namespace vbench::<unnamed> +VESPA_THREAD_STACK_TAG(vbench_inputchain_generator); + VBench::VBench(const vespalib::Slime &cfg) : _factory(), _analyzers(), @@ -76,7 +78,7 @@ VBench::VBench(const vespalib::Slime &cfg) } inputChain->generator = _factory.createGenerator(generator, *inputChain->taggers.back()); if (inputChain->generator.get() != 0) { - inputChain->thread.reset(new vespalib::Thread(*inputChain->generator)); + inputChain->thread.reset(new vespalib::Thread(*inputChain->generator, vbench_inputchain_generator)); _inputs.push_back(std::move(inputChain)); } } diff --git a/vbench/src/vbench/vbench/worker.cpp b/vbench/src/vbench/vbench/worker.cpp index a64956f710b..08f788ea3e9 100644 --- a/vbench/src/vbench/vbench/worker.cpp +++ b/vbench/src/vbench/vbench/worker.cpp @@ -5,6 +5,8 @@ namespace vbench { +VESPA_THREAD_STACK_TAG(vbench_worker_executor); + void Worker::run() { @@ -22,7 +24,7 @@ Worker::run() Worker::Worker(Provider<Request> &provider, Handler<Request> &next, HttpConnectionPool &pool, Timer &timer) - : _thread(*this), + : _thread(*this, vbench_worker_executor), _provider(provider), _next(next), _pool(pool), diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp index 43951b4b734..38abf810354 100644 --- a/vespalib/src/tests/thread/thread_test.cpp +++ b/vespalib/src/tests/thread/thread_test.cpp @@ -6,6 +6,8 @@ using namespace vespalib; +VESPA_THREAD_STACK_TAG(test_agent_executor); + struct Agent : public Runnable { bool started; int loopCnt; @@ -22,7 +24,7 @@ struct Agent : public Runnable { TEST("thread never started") { Agent agent; { - Thread thread(agent); + Thread thread(agent, test_agent_executor); } EXPECT_TRUE(!agent.started); EXPECT_EQUAL(0, agent.loopCnt); @@ -31,7 +33,7 @@ TEST("thread never started") { TEST("normal operation") { Agent agent; { - Thread thread(agent); + Thread thread(agent, test_agent_executor); thread.start(); std::this_thread::sleep_for(20ms); thread.stop().join(); @@ -43,7 +45,7 @@ TEST("normal operation") { TEST("stop before start") { Agent agent; { - Thread thread(agent); + Thread thread(agent, test_agent_executor); thread.stop(); thread.start(); thread.join(); diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index 80bbb3a7ad2..ab83d4e05fd 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -8,6 +8,8 @@ using namespace vespalib::fixed_thread_bundle; namespace vespalib { +VESPA_THREAD_STACK_TAG(simple_thread_bundle_executor); + namespace { struct SignalHook : Runnable { @@ -43,7 +45,7 @@ Runnable::UP wrap(Runnable *runnable) { } Runnable::UP chain(Runnable::UP first, Runnable::UP second) { - return Runnable::UP(new HookPair(std::move(first), std::move(second))); + return std::make_unique<HookPair>(std::move(first), std::move(second)); } } // namespace vespalib::<unnamed> @@ -173,4 +175,19 @@ SimpleThreadBundle::run(const std::vector<Runnable*> &targets) latch.await(); } +SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::UP h) + : thread(*this, simple_thread_bundle_executor), + signal(s), + hook(std::move(h)) +{ + thread.start(); +} +void +SimpleThreadBundle::Worker::run() { + for (size_t gen = 0; signal.wait(gen) > 0; ) { + hook->run(); +} + +} + } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h index f0aaccc2525..d9a29ee7bef 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h @@ -112,14 +112,8 @@ private: Thread thread; Signal &signal; Runnable::UP hook; - Worker(Signal &s, Runnable::UP h) : thread(*this), signal(s), hook(std::move(h)) { - thread.start(); - } - void run() override { - for (size_t gen = 0; signal.wait(gen) > 0; ) { - hook->run(); - } - } + Worker(Signal &s, Runnable::UP h); + void run() override; }; Work _work; diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp index c02a7a3b063..c3230bf313d 100644 --- a/vespalib/src/vespa/vespalib/util/thread.cpp +++ b/vespalib/src/vespa/vespalib/util/thread.cpp @@ -9,9 +9,9 @@ namespace vespalib { __thread Thread *Thread::_currentThread = nullptr; -Thread::Proxy::Proxy(Thread &parent, Runnable &target) - : thread(parent), runnable(target), - start(), started(), cancel(false) +Thread::Proxy::Proxy(Thread &parent, Runnable &target, init_fun_t init_fun_in) + : thread(parent), runnable(target), init_fun(std::move(init_fun_in)), + start(), started(), cancel(false) { } void @@ -22,7 +22,7 @@ Thread::Proxy::Run(FastOS_ThreadInterface *, void *) start.await(); if (!cancel) { started.countDown(); - runnable.run(); + init_fun(runnable); } assert(_currentThread == &thread); _currentThread = nullptr; @@ -30,8 +30,8 @@ Thread::Proxy::Run(FastOS_ThreadInterface *, void *) Thread::Proxy::~Proxy() = default; -Thread::Thread(Runnable &runnable) - : _proxy(*this, runnable), +Thread::Thread(Runnable &runnable, init_fun_t init_fun_in) + : _proxy(*this, runnable, std::move(init_fun_in)), _pool(STACK_SIZE, 1), _lock(), _cond(), diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h index 8873f23ee98..e08f3ca1100 100644 --- a/vespalib/src/vespa/vespalib/util/thread.h +++ b/vespalib/src/vespa/vespalib/util/thread.h @@ -15,17 +15,19 @@ namespace vespalib { class Thread : public Active { private: + using init_fun_t = Runnable::init_fun_t; enum { STACK_SIZE = 256*1024 }; static __thread Thread *_currentThread; struct Proxy : FastOS_Runnable { Thread &thread; Runnable &runnable; + init_fun_t init_fun; vespalib::Gate start; vespalib::Gate started; bool cancel; - Proxy(Thread &parent, Runnable &target); + Proxy(Thread &parent, Runnable &target, init_fun_t init_fun_in); ~Proxy() override; void Run(FastOS_ThreadInterface *thisThread, void *arguments) override; @@ -39,7 +41,7 @@ private: bool _woken; public: - Thread(Runnable &runnable); + Thread(Runnable &runnable, init_fun_t init_fun_in); ~Thread() override; void start() override; Thread &stop() override; |