summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config/src/vespa/config/helper/configfetcher.cpp9
-rw-r--r--config/src/vespa/config/retriever/simpleconfigurer.cpp4
-rw-r--r--searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp4
-rw-r--r--slobrok/src/vespa/slobrok/server/slobrokserver.cpp6
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp3
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp4
-rw-r--r--vbench/src/apps/vbench/vbench.cpp7
-rw-r--r--vbench/src/tests/dispatcher/dispatcher_test.cpp7
-rw-r--r--vbench/src/tests/handler_thread/handler_thread_test.cpp4
-rw-r--r--vbench/src/vbench/core/handler_thread.h2
-rw-r--r--vbench/src/vbench/core/handler_thread.hpp4
-rw-r--r--vbench/src/vbench/vbench/request_scheduler.cpp8
-rw-r--r--vbench/src/vbench/vbench/vbench.cpp4
-rw-r--r--vbench/src/vbench/vbench/worker.cpp4
-rw-r--r--vespalib/src/tests/thread/thread_test.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp19
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.h10
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.cpp12
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.h6
19 files changed, 82 insertions, 43 deletions
diff --git a/config/src/vespa/config/helper/configfetcher.cpp b/config/src/vespa/config/helper/configfetcher.cpp
index 7a6f806c6ff..0ac3492c97f 100644
--- a/config/src/vespa/config/helper/configfetcher.cpp
+++ b/config/src/vespa/config/helper/configfetcher.cpp
@@ -9,9 +9,12 @@ LOG_SETUP(".config.helper.configfetcher");
namespace config {
-ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context)
+VESPA_THREAD_STACK_TAG(config_fetcher_executor);
+
+
+ ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context)
: _poller(context),
- _thread(std::make_unique<vespalib::Thread>(_poller)),
+ _thread(std::make_unique<vespalib::Thread>(_poller, config_fetcher_executor)),
_closed(false),
_started(false)
{
@@ -19,7 +22,7 @@ ConfigFetcher::ConfigFetcher(const IConfigContext::SP & context)
ConfigFetcher::ConfigFetcher(const SourceSpec & spec)
: _poller(std::make_shared<ConfigContext>(spec)),
- _thread(std::make_unique<vespalib::Thread>(_poller)),
+ _thread(std::make_unique<vespalib::Thread>(_poller, config_fetcher_executor)),
_closed(false),
_started(false)
{
diff --git a/config/src/vespa/config/retriever/simpleconfigurer.cpp b/config/src/vespa/config/retriever/simpleconfigurer.cpp
index 74022cfd6a3..9d6a6f752c3 100644
--- a/config/src/vespa/config/retriever/simpleconfigurer.cpp
+++ b/config/src/vespa/config/retriever/simpleconfigurer.cpp
@@ -8,10 +8,12 @@ LOG_SETUP(".config.retriever.simpleconfigurer");
namespace config {
+VESPA_THREAD_STACK_TAG(simple_configurer_executor);
+
SimpleConfigurer::SimpleConfigurer(SimpleConfigRetriever::UP retriever, SimpleConfigurable * const configurable)
: _retriever(std::move(retriever)),
_configurable(configurable),
- _thread(*this),
+ _thread(*this, simple_configurer_executor),
_started(false)
{
assert(_retriever);
diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp
index 3474a4297c7..516c31cb232 100644
--- a/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp
+++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_handler/gid_to_lid_change_handler_test.cpp
@@ -30,6 +30,10 @@ vespalib::string doc1("id:test:music::1");
}
+TEST("control sizeof(PendingGidToLidChange)") {
+ EXPECT_EQUAL(48u, sizeof(PendingGidToLidChange));
+}
+
class ListenerStats {
using lock_guard = std::lock_guard<std::mutex>;
std::mutex _lock;
diff --git a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp
index b962ecf611e..4223bedcf3f 100644
--- a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp
+++ b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp
@@ -7,16 +7,18 @@ LOG_SETUP(".slobrok.server");
namespace slobrok {
+VESPA_THREAD_STACK_TAG(slobrok_server_executor);
+
SlobrokServer::SlobrokServer(ConfigShim &shim)
: _env(shim),
- _thread(*this)
+ _thread(*this, slobrok_server_executor)
{
_thread.start();
}
SlobrokServer::SlobrokServer(uint32_t port)
: _env(ConfigShim(port)),
- _thread(*this)
+ _thread(*this, slobrok_server_executor)
{
_thread.start();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 99791c36a9e..a99bce0a705 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -19,7 +19,7 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_mutex(),
_consumerCondition(),
_producerCondition(),
- _thread(*this),
+ _thread(*this, func),
_idleTracker(steady_clock::now()),
_threadIdleTracker(),
_wakeupCount(0),
@@ -32,7 +32,6 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_reactionTime(reactionTime),
_closed(false)
{
- (void) func; //TODO implement similar to ThreadStackExecutor
assert(taskLimit >= watermark);
_thread.start();
}
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index a3f0182ba30..ffd7d804663 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -22,6 +22,8 @@ namespace storage {
namespace {
+VESPA_THREAD_STACK_TAG(test_executor);
+
// Exploit the fact that PersistenceProviderWrapper already provides a forwarding
// implementation of all SPI calls, so we can selectively override.
class BlockingMockProvider : public PersistenceProviderWrapper
@@ -294,7 +296,7 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_
auto abortCmd = makeAbortCmd(abortSet);
SendTask sendTask(abortCmd, *_queueBarrier, c.top);
- vespalib::Thread thread(sendTask);
+ vespalib::Thread thread(sendTask, test_executor);
thread.start();
LOG(debug, "waiting for threads to reach barriers");
diff --git a/vbench/src/apps/vbench/vbench.cpp b/vbench/src/apps/vbench/vbench.cpp
index 00499519dcc..d37ed7688de 100644
--- a/vbench/src/apps/vbench/vbench.cpp
+++ b/vbench/src/apps/vbench/vbench.cpp
@@ -8,6 +8,8 @@
using namespace vbench;
+VESPA_THREAD_STACK_TAG(vbench_executor);
+
typedef vespalib::SignalHandler SIG;
struct NotifyDone : public vespalib::Runnable {
@@ -31,8 +33,7 @@ int run(const std::string &cfg_name) {
return 1;
}
vespalib::Slime cfg;
- vespalib::Memory mapped_cfg(cfg_file.get().data,
- cfg_file.get().size);
+ vespalib::Memory mapped_cfg(cfg_file.get().data, cfg_file.get().size);
if (!vespalib::slime::JsonFormat::decode(mapped_cfg, cfg)) {
fprintf(stderr, "unable to parse config file: %s\n",
cfg.toString().c_str());
@@ -43,7 +44,7 @@ int run(const std::string &cfg_name) {
VBench vbench(cfg);
NotifyDone notify(done);
vespalib::RunnablePair runBoth(vbench, notify);
- vespalib::Thread thread(runBoth);
+ vespalib::Thread thread(runBoth, vbench_executor);
thread.start();
while (!SIG::INT.check() && !SIG::TERM.check() && !done.await(1s)) {}
if (!done.await(vespalib::duration::zero())) {
diff --git a/vbench/src/tests/dispatcher/dispatcher_test.cpp b/vbench/src/tests/dispatcher/dispatcher_test.cpp
index b2c002e3e50..99b57669120 100644
--- a/vbench/src/tests/dispatcher/dispatcher_test.cpp
+++ b/vbench/src/tests/dispatcher/dispatcher_test.cpp
@@ -17,6 +17,9 @@ struct Fetcher : public vespalib::Runnable {
void run() override { handler.handle(provider.provide()); }
};
+VESPA_THREAD_STACK_TAG(fetcher1_executor);
+VESPA_THREAD_STACK_TAG(fetcher2_executor);
+
TEST("dispatcher") {
MyHandler dropped;
MyHandler handler1;
@@ -24,8 +27,8 @@ TEST("dispatcher") {
Dispatcher<int> dispatcher(dropped);
Fetcher fetcher1(dispatcher, handler1);
Fetcher fetcher2(dispatcher, handler2);
- vespalib::Thread thread1(fetcher1);
- vespalib::Thread thread2(fetcher2);
+ vespalib::Thread thread1(fetcher1, fetcher1_executor);
+ vespalib::Thread thread2(fetcher2, fetcher2_executor);
thread1.start();
EXPECT_TRUE(dispatcher.waitForThreads(1, 512));
thread2.start();
diff --git a/vbench/src/tests/handler_thread/handler_thread_test.cpp b/vbench/src/tests/handler_thread/handler_thread_test.cpp
index fd7d630f705..6b5dbedd0ac 100644
--- a/vbench/src/tests/handler_thread/handler_thread_test.cpp
+++ b/vbench/src/tests/handler_thread/handler_thread_test.cpp
@@ -15,9 +15,11 @@ struct MyHandler : Handler<int> {
MyHandler::~MyHandler() = default;
+VESPA_THREAD_STACK_TAG(test_executor);
+
TEST("handler thread") {
MyHandler handler;
- HandlerThread<int> th(handler);
+ HandlerThread<int> th(handler, test_executor);
th.handle(std::unique_ptr<int>(new int(1)));
th.handle(std::unique_ptr<int>(new int(2)));
th.handle(std::unique_ptr<int>(new int(3)));
diff --git a/vbench/src/vbench/core/handler_thread.h b/vbench/src/vbench/core/handler_thread.h
index b4aaf08eee8..402ecbeb0dc 100644
--- a/vbench/src/vbench/core/handler_thread.h
+++ b/vbench/src/vbench/core/handler_thread.h
@@ -33,7 +33,7 @@ private:
void run() override;
public:
- HandlerThread(Handler<T> &next);
+ HandlerThread(Handler<T> &next, init_fun_t init_fun);
~HandlerThread();
void handle(std::unique_ptr<T> obj) override;
void join() override;
diff --git a/vbench/src/vbench/core/handler_thread.hpp b/vbench/src/vbench/core/handler_thread.hpp
index 3d1dc423411..56cc0a7771d 100644
--- a/vbench/src/vbench/core/handler_thread.hpp
+++ b/vbench/src/vbench/core/handler_thread.hpp
@@ -23,12 +23,12 @@ HandlerThread<T>::run()
}
template <typename T>
-HandlerThread<T>::HandlerThread(Handler<T> &next)
+HandlerThread<T>::HandlerThread(Handler<T> &next, init_fun_t init_fun)
: _lock(),
_cond(),
_queue(),
_next(next),
- _thread(*this),
+ _thread(*this, init_fun),
_done(false)
{
_thread.start();
diff --git a/vbench/src/vbench/vbench/request_scheduler.cpp b/vbench/src/vbench/vbench/request_scheduler.cpp
index 80aec6c308e..320ecc91fc6 100644
--- a/vbench/src/vbench/vbench/request_scheduler.cpp
+++ b/vbench/src/vbench/vbench/request_scheduler.cpp
@@ -1,11 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "request_scheduler.h"
-
#include <vbench/core/timer.h>
namespace vbench {
+VESPA_THREAD_STACK_TAG(vbench_request_scheduler_executor);
+VESPA_THREAD_STACK_TAG(vbench_handler_executor);
+
void
RequestScheduler::run()
{
@@ -24,11 +26,11 @@ RequestScheduler::run()
RequestScheduler::RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &next, size_t numWorkers)
: _timer(),
- _proxy(next),
+ _proxy(next, vbench_handler_executor),
_queue(10.0, 0.020),
_droppedTagger(_proxy),
_dispatcher(_droppedTagger),
- _thread(*this),
+ _thread(*this, vbench_request_scheduler_executor),
_connectionPool(std::move(crypto), _timer),
_workers()
{
diff --git a/vbench/src/vbench/vbench/vbench.cpp b/vbench/src/vbench/vbench/vbench.cpp
index d636f7a1cd7..9a5adad262e 100644
--- a/vbench/src/vbench/vbench/vbench.cpp
+++ b/vbench/src/vbench/vbench/vbench.cpp
@@ -40,6 +40,8 @@ CryptoEngine::SP setup_crypto(const vespalib::slime::Inspector &tls) {
} // namespace vbench::<unnamed>
+VESPA_THREAD_STACK_TAG(vbench_inputchain_generator);
+
VBench::VBench(const vespalib::Slime &cfg)
: _factory(),
_analyzers(),
@@ -76,7 +78,7 @@ VBench::VBench(const vespalib::Slime &cfg)
}
inputChain->generator = _factory.createGenerator(generator, *inputChain->taggers.back());
if (inputChain->generator.get() != 0) {
- inputChain->thread.reset(new vespalib::Thread(*inputChain->generator));
+ inputChain->thread.reset(new vespalib::Thread(*inputChain->generator, vbench_inputchain_generator));
_inputs.push_back(std::move(inputChain));
}
}
diff --git a/vbench/src/vbench/vbench/worker.cpp b/vbench/src/vbench/vbench/worker.cpp
index a64956f710b..08f788ea3e9 100644
--- a/vbench/src/vbench/vbench/worker.cpp
+++ b/vbench/src/vbench/vbench/worker.cpp
@@ -5,6 +5,8 @@
namespace vbench {
+VESPA_THREAD_STACK_TAG(vbench_worker_executor);
+
void
Worker::run()
{
@@ -22,7 +24,7 @@ Worker::run()
Worker::Worker(Provider<Request> &provider, Handler<Request> &next,
HttpConnectionPool &pool, Timer &timer)
- : _thread(*this),
+ : _thread(*this, vbench_worker_executor),
_provider(provider),
_next(next),
_pool(pool),
diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp
index 43951b4b734..38abf810354 100644
--- a/vespalib/src/tests/thread/thread_test.cpp
+++ b/vespalib/src/tests/thread/thread_test.cpp
@@ -6,6 +6,8 @@
using namespace vespalib;
+VESPA_THREAD_STACK_TAG(test_agent_executor);
+
struct Agent : public Runnable {
bool started;
int loopCnt;
@@ -22,7 +24,7 @@ struct Agent : public Runnable {
TEST("thread never started") {
Agent agent;
{
- Thread thread(agent);
+ Thread thread(agent, test_agent_executor);
}
EXPECT_TRUE(!agent.started);
EXPECT_EQUAL(0, agent.loopCnt);
@@ -31,7 +33,7 @@ TEST("thread never started") {
TEST("normal operation") {
Agent agent;
{
- Thread thread(agent);
+ Thread thread(agent, test_agent_executor);
thread.start();
std::this_thread::sleep_for(20ms);
thread.stop().join();
@@ -43,7 +45,7 @@ TEST("normal operation") {
TEST("stop before start") {
Agent agent;
{
- Thread thread(agent);
+ Thread thread(agent, test_agent_executor);
thread.stop();
thread.start();
thread.join();
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index 80bbb3a7ad2..ab83d4e05fd 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -8,6 +8,8 @@ using namespace vespalib::fixed_thread_bundle;
namespace vespalib {
+VESPA_THREAD_STACK_TAG(simple_thread_bundle_executor);
+
namespace {
struct SignalHook : Runnable {
@@ -43,7 +45,7 @@ Runnable::UP wrap(Runnable *runnable) {
}
Runnable::UP chain(Runnable::UP first, Runnable::UP second) {
- return Runnable::UP(new HookPair(std::move(first), std::move(second)));
+ return std::make_unique<HookPair>(std::move(first), std::move(second));
}
} // namespace vespalib::<unnamed>
@@ -173,4 +175,19 @@ SimpleThreadBundle::run(const std::vector<Runnable*> &targets)
latch.await();
}
+SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::UP h)
+ : thread(*this, simple_thread_bundle_executor),
+ signal(s),
+ hook(std::move(h))
+{
+ thread.start();
+}
+void
+SimpleThreadBundle::Worker::run() {
+ for (size_t gen = 0; signal.wait(gen) > 0; ) {
+ hook->run();
+}
+
+}
+
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
index f0aaccc2525..d9a29ee7bef 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
@@ -112,14 +112,8 @@ private:
Thread thread;
Signal &signal;
Runnable::UP hook;
- Worker(Signal &s, Runnable::UP h) : thread(*this), signal(s), hook(std::move(h)) {
- thread.start();
- }
- void run() override {
- for (size_t gen = 0; signal.wait(gen) > 0; ) {
- hook->run();
- }
- }
+ Worker(Signal &s, Runnable::UP h);
+ void run() override;
};
Work _work;
diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp
index c02a7a3b063..c3230bf313d 100644
--- a/vespalib/src/vespa/vespalib/util/thread.cpp
+++ b/vespalib/src/vespa/vespalib/util/thread.cpp
@@ -9,9 +9,9 @@ namespace vespalib {
__thread Thread *Thread::_currentThread = nullptr;
-Thread::Proxy::Proxy(Thread &parent, Runnable &target)
- : thread(parent), runnable(target),
- start(), started(), cancel(false)
+Thread::Proxy::Proxy(Thread &parent, Runnable &target, init_fun_t init_fun_in)
+ : thread(parent), runnable(target), init_fun(std::move(init_fun_in)),
+ start(), started(), cancel(false)
{ }
void
@@ -22,7 +22,7 @@ Thread::Proxy::Run(FastOS_ThreadInterface *, void *)
start.await();
if (!cancel) {
started.countDown();
- runnable.run();
+ init_fun(runnable);
}
assert(_currentThread == &thread);
_currentThread = nullptr;
@@ -30,8 +30,8 @@ Thread::Proxy::Run(FastOS_ThreadInterface *, void *)
Thread::Proxy::~Proxy() = default;
-Thread::Thread(Runnable &runnable)
- : _proxy(*this, runnable),
+Thread::Thread(Runnable &runnable, init_fun_t init_fun_in)
+ : _proxy(*this, runnable, std::move(init_fun_in)),
_pool(STACK_SIZE, 1),
_lock(),
_cond(),
diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h
index 8873f23ee98..e08f3ca1100 100644
--- a/vespalib/src/vespa/vespalib/util/thread.h
+++ b/vespalib/src/vespa/vespalib/util/thread.h
@@ -15,17 +15,19 @@ namespace vespalib {
class Thread : public Active
{
private:
+ using init_fun_t = Runnable::init_fun_t;
enum { STACK_SIZE = 256*1024 };
static __thread Thread *_currentThread;
struct Proxy : FastOS_Runnable {
Thread &thread;
Runnable &runnable;
+ init_fun_t init_fun;
vespalib::Gate start;
vespalib::Gate started;
bool cancel;
- Proxy(Thread &parent, Runnable &target);
+ Proxy(Thread &parent, Runnable &target, init_fun_t init_fun_in);
~Proxy() override;
void Run(FastOS_ThreadInterface *thisThread, void *arguments) override;
@@ -39,7 +41,7 @@ private:
bool _woken;
public:
- Thread(Runnable &runnable);
+ Thread(Runnable &runnable, init_fun_t init_fun_in);
~Thread() override;
void start() override;
Thread &stop() override;