summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-02-14 18:14:49 +0100
committerGitHub <noreply@github.com>2023-02-14 18:14:49 +0100
commit707663ec7c53198955868561c512cd805a1cc5ec (patch)
treedbd8406dbe6e67a12da087f042ba9b8322f041b1
parentb1ebfcf0196e3af41572cef42f17d045fdb4609f (diff)
parent98e408c667e3dcda3dc177594ffa4c4d0d99a33e (diff)
Merge pull request #26038 from vespa-engine/havardpe/stop-using-jthread-as-well
stop using fastos thread more places
-rw-r--r--config/src/vespa/config/helper/configfetcher.cpp4
-rw-r--r--config/src/vespa/config/retriever/simpleconfigurer.cpp4
-rw-r--r--fnet/src/tests/examples/examples_test.cpp5
-rw-r--r--slobrok/src/vespa/slobrok/server/slobrokserver.cpp8
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp4
-rw-r--r--vbench/src/apps/vbench/vbench.cpp3
-rw-r--r--vbench/src/tests/dispatcher/dispatcher_test.cpp6
-rw-r--r--vbench/src/tests/handler_thread/handler_thread_test.cpp3
-rw-r--r--vbench/src/tests/timer/timer_test.cpp3
-rw-r--r--vbench/src/vbench/core/dispatcher.hpp3
-rw-r--r--vbench/src/vbench/core/handler_thread.h7
-rw-r--r--vbench/src/vbench/core/handler_thread.hpp4
-rw-r--r--vbench/src/vbench/vbench/request_scheduler.cpp24
-rw-r--r--vbench/src/vbench/vbench/request_scheduler.h17
-rw-r--r--vbench/src/vbench/vbench/vbench.cpp5
-rw-r--r--vbench/src/vbench/vbench/vbench.h2
-rw-r--r--vbench/src/vbench/vbench/worker.cpp4
-rw-r--r--vbench/src/vbench/vbench/worker.h6
-rw-r--r--vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp1
-rw-r--r--vespalib/src/tests/singleexecutor/singleexecutor_test.cpp1
-rw-r--r--vespalib/src/tests/thread/thread_test.cpp48
-rw-r--r--vespalib/src/vespa/vespalib/testkit/test_hook.cpp25
-rw-r--r--vespalib/src/vespa/vespalib/testkit/test_hook.h13
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt2
-rw-r--r--vespalib/src/vespa/vespalib/util/active.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/active.h34
-rw-r--r--vespalib/src/vespa/vespalib/util/joinable.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/joinable.h23
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp4
-rw-r--r--vespalib/src/vespa/vespalib/util/singleexecutor.cpp9
-rw-r--r--vespalib/src/vespa/vespalib/util/singleexecutor.h6
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.cpp75
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.h56
33 files changed, 134 insertions, 289 deletions
diff --git a/config/src/vespa/config/helper/configfetcher.cpp b/config/src/vespa/config/helper/configfetcher.cpp
index b2cf6e1955d..ec4ea42d0bb 100644
--- a/config/src/vespa/config/helper/configfetcher.cpp
+++ b/config/src/vespa/config/helper/configfetcher.cpp
@@ -15,7 +15,7 @@ VESPA_THREAD_STACK_TAG(config_fetcher_thread);
ConfigFetcher::ConfigFetcher(std::shared_ptr<IConfigContext> context)
: _poller(std::make_unique<ConfigPoller>(std::move(context))),
- _thread(std::make_unique<vespalib::Thread>(*_poller, config_fetcher_thread)),
+ _thread(std::make_unique<vespalib::Thread>()),
_closed(false),
_started(false)
{
@@ -36,7 +36,7 @@ ConfigFetcher::start()
throw ConfigTimeoutException("ConfigFetcher::start timed out getting initial config");
}
LOG(debug, "Starting fetcher thread...");
- _thread->start();
+ *_thread = vespalib::Thread::start(*_poller, config_fetcher_thread);
_started = true;
LOG(debug, "Fetcher thread started");
}
diff --git a/config/src/vespa/config/retriever/simpleconfigurer.cpp b/config/src/vespa/config/retriever/simpleconfigurer.cpp
index 5059b9997f5..1e89f51ec03 100644
--- a/config/src/vespa/config/retriever/simpleconfigurer.cpp
+++ b/config/src/vespa/config/retriever/simpleconfigurer.cpp
@@ -13,7 +13,7 @@ VESPA_THREAD_STACK_TAG(simple_configurer_thread);
SimpleConfigurer::SimpleConfigurer(SimpleConfigRetriever::UP retriever, SimpleConfigurable * const configurable)
: _retriever(std::move(retriever)),
_configurable(configurable),
- _thread(*this, simple_configurer_thread),
+ _thread(),
_started(false)
{
assert(_retriever);
@@ -25,7 +25,7 @@ SimpleConfigurer::start()
if (!_retriever->isClosed()) {
LOG(debug, "Polling for config");
runConfigure();
- _thread.start();
+ _thread = vespalib::Thread::start(*this, simple_configurer_thread);
_started = true;
}
}
diff --git a/fnet/src/tests/examples/examples_test.cpp b/fnet/src/tests/examples/examples_test.cpp
index 4b9e2a58ef1..1b666898ff2 100644
--- a/fnet/src/tests/examples/examples_test.cpp
+++ b/fnet/src/tests/examples/examples_test.cpp
@@ -3,7 +3,8 @@
#include <vespa/vespalib/process/process.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/thread.h>
+#include <vespa/vespalib/util/time.h>
+#include <thread>
#include <atomic>
#include <csignal>
@@ -42,7 +43,7 @@ bool run_with_retry(const vespalib::string &cmd) {
for (size_t retry = 0; retry < 60; ++retry) {
if (retry > 0) {
fprintf(stderr, "retrying command in 500ms...\n");
- vespalib::Thread::sleep(500);
+ std::this_thread::sleep_for(500ms);
}
vespalib::string output;
Process proc(cmd, true);
diff --git a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp
index 5601336fdfd..4a986d9ba01 100644
--- a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp
+++ b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp
@@ -8,16 +8,16 @@ VESPA_THREAD_STACK_TAG(slobrok_server_thread);
SlobrokServer::SlobrokServer(ConfigShim &shim)
: _env(shim),
- _thread(*this, slobrok_server_thread)
+ _thread()
{
- _thread.start();
+ _thread = vespalib::Thread::start(*this, slobrok_server_thread);
}
SlobrokServer::SlobrokServer(uint32_t port)
: _env(ConfigShim(port)),
- _thread(*this, slobrok_server_thread)
+ _thread()
{
- _thread.start();
+ _thread = vespalib::Thread::start(*this, slobrok_server_thread);
}
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index 000e1e1c155..d7ecb1f30f1 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -296,8 +296,7 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_
auto abortCmd = makeAbortCmd(abortSet);
SendTask sendTask(abortCmd, *_queueBarrier, c.top);
- vespalib::Thread thread(sendTask, test_thread);
- thread.start();
+ auto thread = vespalib::Thread::start(sendTask, test_thread);
LOG(debug, "waiting for threads to reach barriers");
_queueBarrier->await();
@@ -306,7 +305,6 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_
LOG(debug, "waiting on completion barrier");
_completionBarrier->await();
- thread.stop();
thread.join();
// If waiting works, put reply shall always be ordered before the internal
diff --git a/vbench/src/apps/vbench/vbench.cpp b/vbench/src/apps/vbench/vbench.cpp
index ffea64c6034..b5c2897207f 100644
--- a/vbench/src/apps/vbench/vbench.cpp
+++ b/vbench/src/apps/vbench/vbench.cpp
@@ -44,8 +44,7 @@ int run(const std::string &cfg_name) {
VBench vbench(cfg);
NotifyDone notify(done);
vespalib::RunnablePair runBoth(vbench, notify);
- vespalib::Thread thread(runBoth, vbench_thread);
- thread.start();
+ auto thread = vespalib::Thread::start(runBoth, vbench_thread);
while (!SIG::INT.check() && !SIG::TERM.check() && !done.await(1s)) {}
if (!done.await(vespalib::duration::zero())) {
vbench.abort();
diff --git a/vbench/src/tests/dispatcher/dispatcher_test.cpp b/vbench/src/tests/dispatcher/dispatcher_test.cpp
index 85879feb0ee..6a3fb8d0c7c 100644
--- a/vbench/src/tests/dispatcher/dispatcher_test.cpp
+++ b/vbench/src/tests/dispatcher/dispatcher_test.cpp
@@ -30,11 +30,9 @@ TEST("dispatcher") {
Dispatcher<int> dispatcher(dropped);
Fetcher fetcher1(dispatcher, handler1);
Fetcher fetcher2(dispatcher, handler2);
- vespalib::Thread thread1(fetcher1, fetcher1_thread);
- vespalib::Thread thread2(fetcher2, fetcher2_thread);
- thread1.start();
+ auto thread1 = vespalib::Thread::start(fetcher1, fetcher1_thread);
EXPECT_TRUE(dispatcher.waitForThreads(1, 512));
- thread2.start();
+ auto thread2 = vespalib::Thread::start(fetcher2, fetcher2_thread);
EXPECT_TRUE(dispatcher.waitForThreads(2, 512));
EXPECT_EQUAL(-1, dropped.value);
EXPECT_EQUAL(-1, handler1.value);
diff --git a/vbench/src/tests/handler_thread/handler_thread_test.cpp b/vbench/src/tests/handler_thread/handler_thread_test.cpp
index 97a12e82ac8..497b7db2883 100644
--- a/vbench/src/tests/handler_thread/handler_thread_test.cpp
+++ b/vbench/src/tests/handler_thread/handler_thread_test.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vbench/test/all.h>
+#include <vespa/vespalib/util/time.h>
using namespace vbench;
@@ -9,7 +10,7 @@ struct MyHandler : Handler<int> {
~MyHandler() override;
void handle(std::unique_ptr<int> value) override {
values.push_back(*value);
- vespalib::Thread::sleep(10); // for improved coverage
+ std::this_thread::sleep_for(10ms);
}
};
diff --git a/vbench/src/tests/timer/timer_test.cpp b/vbench/src/tests/timer/timer_test.cpp
index 43e7b76caa0..eda0564d2d8 100644
--- a/vbench/src/tests/timer/timer_test.cpp
+++ b/vbench/src/tests/timer/timer_test.cpp
@@ -1,13 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vbench/test/all.h>
+#include <vespa/vespalib/util/time.h>
using namespace vbench;
IGNORE_TEST("timer") {
Timer timer;
EXPECT_APPROX(0.0, timer.sample(), 0.1);
- vespalib::Thread::sleep(1000);
+ std::this_thread::sleep_for(1000ms);
EXPECT_APPROX(1.0, timer.sample(), 0.1);
timer.reset();
EXPECT_APPROX(0.0, timer.sample(), 0.1);
diff --git a/vbench/src/vbench/core/dispatcher.hpp b/vbench/src/vbench/core/dispatcher.hpp
index aa5afcd6d67..572e9f3381d 100644
--- a/vbench/src/vbench/core/dispatcher.hpp
+++ b/vbench/src/vbench/core/dispatcher.hpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/thread.h>
+#include <vespa/vespalib/util/time.h>
namespace vbench {
@@ -22,7 +23,7 @@ Dispatcher<T>::waitForThreads(size_t threads, size_t pollCnt) const
{
for (size_t i = 0; i < pollCnt; ++i) {
if (i != 0) {
- vespalib::Thread::sleep(20);
+ std::this_thread::sleep_for(20ms);
}
{
std::lock_guard guard(_lock);
diff --git a/vbench/src/vbench/core/handler_thread.h b/vbench/src/vbench/core/handler_thread.h
index 402ecbeb0dc..8ece1389dfc 100644
--- a/vbench/src/vbench/core/handler_thread.h
+++ b/vbench/src/vbench/core/handler_thread.h
@@ -6,7 +6,7 @@
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/runnable.h>
-#include <vespa/vespalib/util/joinable.h>
+#include <condition_variable>
namespace vbench {
@@ -19,8 +19,7 @@ namespace vbench {
**/
template <typename T>
class HandlerThread : public Handler<T>,
- public vespalib::Runnable,
- public vespalib::Joinable
+ public vespalib::Runnable
{
private:
std::mutex _lock;
@@ -36,7 +35,7 @@ public:
HandlerThread(Handler<T> &next, init_fun_t init_fun);
~HandlerThread();
void handle(std::unique_ptr<T> obj) override;
- void join() override;
+ void join();
};
} // namespace vbench
diff --git a/vbench/src/vbench/core/handler_thread.hpp b/vbench/src/vbench/core/handler_thread.hpp
index 56cc0a7771d..1a99861ea81 100644
--- a/vbench/src/vbench/core/handler_thread.hpp
+++ b/vbench/src/vbench/core/handler_thread.hpp
@@ -28,10 +28,10 @@ HandlerThread<T>::HandlerThread(Handler<T> &next, init_fun_t init_fun)
_cond(),
_queue(),
_next(next),
- _thread(*this, init_fun),
+ _thread(),
_done(false)
{
- _thread.start();
+ _thread = vespalib::Thread::start(*this, init_fun);
}
template <typename T>
diff --git a/vbench/src/vbench/vbench/request_scheduler.cpp b/vbench/src/vbench/vbench/request_scheduler.cpp
index 95d29181b1f..cde31ec07b8 100644
--- a/vbench/src/vbench/vbench/request_scheduler.cpp
+++ b/vbench/src/vbench/vbench/request_scheduler.cpp
@@ -2,6 +2,7 @@
#include "request_scheduler.h"
#include <vbench/core/timer.h>
+#include <vespa/vespalib/util/time.h>
namespace vbench {
@@ -13,14 +14,18 @@ RequestScheduler::run()
{
double sleepTime;
std::vector<Request::UP> list;
- vespalib::Thread &thread = vespalib::Thread::currentThread();
while (_queue.extract(_timer.sample(), list, sleepTime)) {
for (size_t i = 0; i < list.size(); ++i) {
Request::UP request = Request::UP(list[i].release());
_dispatcher.handle(std::move(request));
}
list.clear();
- thread.slumber(sleepTime);
+ {
+ auto guard = std::unique_lock(_lock);
+ if (_may_slumber) {
+ _cond.wait_for(guard, std::chrono::duration<double,std::milli>(sleepTime));
+ }
+ }
}
}
@@ -30,9 +35,12 @@ RequestScheduler::RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &ne
_queue(10.0, 0.020),
_droppedTagger(_proxy),
_dispatcher(_droppedTagger),
- _thread(*this, vbench_request_scheduler_thread),
+ _thread(),
_connectionPool(std::move(crypto), _timer),
- _workers()
+ _workers(),
+ _lock(),
+ _cond(),
+ _may_slumber(true)
{
for (size_t i = 0; i < numWorkers; ++i) {
_workers.push_back(std::make_unique<Worker>(_dispatcher, _proxy, _connectionPool, _timer));
@@ -45,7 +53,11 @@ RequestScheduler::abort()
{
_queue.close();
_queue.discard();
- _thread.stop();
+ {
+ auto guard = std::lock_guard(_lock);
+ _may_slumber = false;
+ _cond.notify_all();
+ }
}
void
@@ -59,7 +71,7 @@ void
RequestScheduler::start()
{
_timer.reset();
- _thread.start();
+ _thread = vespalib::Thread::start(*this, vbench_request_scheduler_thread);
}
RequestScheduler &
diff --git a/vbench/src/vbench/vbench/request_scheduler.h b/vbench/src/vbench/vbench/request_scheduler.h
index a0fb1eda4e7..b1d525eb691 100644
--- a/vbench/src/vbench/vbench/request_scheduler.h
+++ b/vbench/src/vbench/vbench/request_scheduler.h
@@ -7,7 +7,8 @@
#include <vbench/core/time_queue.h>
#include <vbench/core/dispatcher.h>
#include <vbench/core/handler_thread.h>
-#include <vespa/vespalib/util/active.h>
+#include <mutex>
+#include <condition_variable>
namespace vbench {
@@ -17,8 +18,7 @@ namespace vbench {
* with.
**/
class RequestScheduler : public Handler<Request>,
- public vespalib::Runnable,
- public vespalib::Active
+ public vespalib::Runnable
{
private:
Timer _timer;
@@ -29,7 +29,10 @@ private:
vespalib::Thread _thread;
HttpConnectionPool _connectionPool;
std::vector<Worker::UP> _workers;
-
+ std::mutex _lock;
+ std::condition_variable _cond;
+ bool _may_slumber;
+
void run() override;
public:
using UP = std::unique_ptr<RequestScheduler>;
@@ -37,9 +40,9 @@ public:
RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &next, size_t numWorkers);
void abort();
void handle(Request::UP request) override;
- void start() override;
- RequestScheduler &stop() override;
- void join() override;
+ void start();
+ RequestScheduler &stop();
+ void join();
};
} // namespace vbench
diff --git a/vbench/src/vbench/vbench/vbench.cpp b/vbench/src/vbench/vbench/vbench.cpp
index 9a5adad262e..8a8bceccefb 100644
--- a/vbench/src/vbench/vbench/vbench.cpp
+++ b/vbench/src/vbench/vbench/vbench.cpp
@@ -78,7 +78,6 @@ VBench::VBench(const vespalib::Slime &cfg)
}
inputChain->generator = _factory.createGenerator(generator, *inputChain->taggers.back());
if (inputChain->generator.get() != 0) {
- inputChain->thread.reset(new vespalib::Thread(*inputChain->generator, vbench_inputchain_generator));
_inputs.push_back(std::move(inputChain));
}
}
@@ -101,10 +100,10 @@ VBench::run()
{
_scheduler->start();
for (size_t i = 0; i < _inputs.size(); ++i) {
- _inputs[i]->thread->start();
+ _inputs[i]->thread = vespalib::Thread::start(*_inputs[i]->generator, vbench_inputchain_generator);
}
for (size_t i = 0; i < _inputs.size(); ++i) {
- _inputs[i]->thread->join();
+ _inputs[i]->thread.join();
}
_scheduler->stop().join();
for (size_t i = 0; i < _inputs.size(); ++i) {
diff --git a/vbench/src/vbench/vbench/vbench.h b/vbench/src/vbench/vbench/vbench.h
index dbb46e72800..2b7fcf0cd88 100644
--- a/vbench/src/vbench/vbench/vbench.h
+++ b/vbench/src/vbench/vbench/vbench.h
@@ -26,7 +26,7 @@ private:
using UP = std::unique_ptr<InputChain>;
std::vector<Tagger::UP> taggers;
Generator::UP generator;
- std::unique_ptr<vespalib::Thread> thread;
+ vespalib::Thread thread;
};
NativeFactory _factory;
std::vector<Analyzer::UP> _analyzers;
diff --git a/vbench/src/vbench/vbench/worker.cpp b/vbench/src/vbench/vbench/worker.cpp
index afccc7de39f..eabd17ae73f 100644
--- a/vbench/src/vbench/vbench/worker.cpp
+++ b/vbench/src/vbench/vbench/worker.cpp
@@ -24,13 +24,13 @@ Worker::run()
Worker::Worker(Provider<Request> &provider, Handler<Request> &next,
HttpConnectionPool &pool, Timer &timer)
- : _thread(*this, vbench_worker_thread),
+ : _thread(),
_provider(provider),
_next(next),
_pool(pool),
_timer(timer)
{
- _thread.start();
+ _thread = vespalib::Thread::start(*this, vbench_worker_thread);
}
} // namespace vbench
diff --git a/vbench/src/vbench/vbench/worker.h b/vbench/src/vbench/vbench/worker.h
index 6594a4f3dd6..d2bcfae637b 100644
--- a/vbench/src/vbench/vbench/worker.h
+++ b/vbench/src/vbench/vbench/worker.h
@@ -8,7 +8,6 @@
#include <vbench/http/http_connection_pool.h>
#include <vespa/vespalib/util/runnable.h>
#include <vespa/vespalib/util/thread.h>
-#include <vespa/vespalib/util/joinable.h>
namespace vbench {
@@ -18,8 +17,7 @@ namespace vbench {
* internal thread that will stop when the request provider starts
* handing out empty requests.
**/
-class Worker : public vespalib::Runnable,
- public vespalib::Joinable
+class Worker : public vespalib::Runnable
{
private:
vespalib::Thread _thread;
@@ -33,7 +31,7 @@ public:
using UP = std::unique_ptr<Worker>;
Worker(Provider<Request> &provider, Handler<Request> &next,
HttpConnectionPool &pool, Timer &timer);
- void join() override { _thread.join(); }
+ void join() { _thread.join(); }
};
} // namespace vbench
diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
index 59aeddfe8ca..e451f1e033d 100644
--- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
+++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/box.h>
#include <vespa/vespalib/util/small_vector.h>
+#include <vespa/vespalib/util/gate.h>
#include <thread>
#include <forward_list>
diff --git a/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
index 56352ff3c0d..3b1d244eb13 100644
--- a/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
+++ b/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/singleexecutor.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/alloc.h>
+#include <vespa/vespalib/util/gate.h>
#include <atomic>
using namespace vespalib;
diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp
index af1fb626462..f7e8753fd86 100644
--- a/vespalib/src/tests/thread/thread_test.cpp
+++ b/vespalib/src/tests/thread/thread_test.cpp
@@ -2,56 +2,48 @@
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/util/thread.h>
-#include <thread>
using namespace vespalib;
VESPA_THREAD_STACK_TAG(test_agent_thread);
struct Agent : public Runnable {
- bool started;
- int loopCnt;
- Agent() : started(false), loopCnt(0) {}
+ bool was_run;
+ Agent() : was_run(false) {}
void run() override {
- started = true;
- Thread &thread = Thread::currentThread();
- while (thread.slumber(60.0)) {
- ++loopCnt;
- }
+ was_run = true;
}
};
-TEST("thread never started") {
+void my_fun(bool *was_run) {
+ *was_run = true;
+}
+
+TEST("run vespalib::Runnable with init function") {
Agent agent;
{
- Thread thread(agent, test_agent_thread);
+ auto thread = Thread::start(agent, test_agent_thread);
}
- EXPECT_TRUE(!agent.started);
- EXPECT_EQUAL(0, agent.loopCnt);
+ EXPECT_TRUE(agent.was_run);
}
-TEST("normal operation") {
- Agent agent;
+TEST("run custom function") {
+ bool was_run = false;
{
- Thread thread(agent, test_agent_thread);
- thread.start();
- std::this_thread::sleep_for(20ms);
- thread.stop().join();
+ auto thread = Thread::start(my_fun, &was_run);
}
- EXPECT_TRUE(agent.started);
- EXPECT_EQUAL(0, agent.loopCnt);
+ EXPECT_TRUE(was_run);
}
-TEST("stop before start") {
- Agent agent;
+TEST("join multiple times (including destructor)") {
+ bool was_run = false;
{
- Thread thread(agent, test_agent_thread);
- thread.stop();
- thread.start();
+ auto thread = Thread::start(my_fun, &was_run);
+ thread.join();
+ thread.join();
thread.join();
}
- EXPECT_TRUE(!agent.started);
- EXPECT_EQUAL(0, agent.loopCnt);
+ EXPECT_TRUE(was_run);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.cpp b/vespalib/src/vespa/vespalib/testkit/test_hook.cpp
index 4e33897d869..5b618beb01b 100644
--- a/vespalib/src/vespa/vespalib/testkit/test_hook.cpp
+++ b/vespalib/src/vespa/vespalib/testkit/test_hook.cpp
@@ -4,31 +4,9 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/size_literals.h>
#include <regex>
-#include <vespa/fastos/thread.h>
namespace vespalib {
-namespace {
-
-struct FastOSTestThreadRunner : FastOS_Runnable {
- TestThreadEntry &entry;
- FastOSTestThreadRunner(TestThreadEntry &entry_in) : entry(entry_in) {}
- bool DeleteOnCompletion() const override { return true; }
- void Run(FastOS_ThreadInterface *, void *) override { entry.threadEntry(); }
-};
-
-struct FastOSTestThreadFactory : TestThreadFactory {
- FastOS_ThreadPool threadPool;
- FastOSTestThreadFactory() : threadPool() {}
- void createThread(TestThreadEntry &entry) override {
- threadPool.NewThread(new FastOSTestThreadRunner(entry), 0);
- }
-};
-
-} // namespace vespalib::<unnamed>
-
-__thread TestThreadFactory *TestThreadFactory::factory = 0;
-
void
TestThreadWrapper::threadEntry()
{
@@ -96,8 +74,6 @@ const char *lookup_subset_pattern(const std::string &name) {
void
TestHook::runAll()
{
- FastOSTestThreadFactory threadFactory;
- TestThreadFactory::factory = &threadFactory;
std::string name = TestMaster::master.getName();
std::regex pattern(lookup_subset_pattern(name));
size_t testsPassed = 0;
@@ -134,7 +110,6 @@ TestHook::runAll()
fprintf(stderr, "%s: Warn: test summary --- %zu test(s) ignored\n",
name.c_str(), testsIgnored);
}
- TestThreadFactory::factory = 0;
}
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.h b/vespalib/src/vespa/vespalib/testkit/test_hook.h
index d0e04ede2e9..8a5c6c1e684 100644
--- a/vespalib/src/vespa/vespalib/testkit/test_hook.h
+++ b/vespalib/src/vespa/vespalib/testkit/test_hook.h
@@ -4,6 +4,8 @@
#include <vespa/vespalib/util/count_down_latch.h>
#include <vespa/vespalib/util/barrier.h>
+#include <vespa/vespalib/util/thread.h>
+#include <thread>
#include <string>
#include <vector>
#include <cassert>
@@ -16,12 +18,6 @@ struct TestThreadEntry {
virtual ~TestThreadEntry() {}
};
-struct TestThreadFactory {
- static __thread TestThreadFactory *factory;
- virtual void createThread(TestThreadEntry &entry) = 0;
- virtual ~TestThreadFactory() {}
-};
-
struct TestFixtureWrapper {
size_t thread_id;
size_t num_threads;
@@ -82,8 +78,10 @@ protected:
Barrier barrier(num_threads);
std::vector<FixtureUP> fixtures;
std::vector<ThreadUP> threads;
+ std::vector<Thread> thread_handles;
threads.reserve(num_threads);
fixtures.reserve(num_threads);
+ thread_handles.reserve(num_threads - 1);
for (size_t i = 0; i < num_threads; ++i) {
FixtureUP fixture_up(new T(fixture));
fixture_up->thread_id = i;
@@ -92,8 +90,7 @@ protected:
fixtures.push_back(std::move(fixture_up));
}
for (size_t i = 1; i < num_threads; ++i) {
- assert(TestThreadFactory::factory != 0);
- TestThreadFactory::factory->createThread(*threads[i]);
+ thread_handles.push_back(Thread::start([&target = *threads[i]](){ target.threadEntry(); }));
}
threads[0]->threadEntry();
latch.await();
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index ad2db89288c..73e8b93a2ff 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -1,7 +1,6 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(vespalib_vespalib_util OBJECT
SOURCES
- active.cpp
adaptive_sequenced_executor.cpp
address_space.cpp
alloc.cpp
@@ -45,7 +44,6 @@ vespa_add_library(vespalib_vespalib_util OBJECT
invokeserviceimpl.cpp
isequencedtaskexecutor.cpp
issue.cpp
- joinable.cpp
jsonexception.cpp
jsonstream.cpp
jsonwriter.cpp
diff --git a/vespalib/src/vespa/vespalib/util/active.cpp b/vespalib/src/vespa/vespalib/util/active.cpp
deleted file mode 100644
index 48785c74b79..00000000000
--- a/vespalib/src/vespa/vespalib/util/active.cpp
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "active.h"
-
-namespace vespalib {
-
-} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/active.h b/vespalib/src/vespa/vespalib/util/active.h
deleted file mode 100644
index 1fbff9514d7..00000000000
--- a/vespalib/src/vespa/vespalib/util/active.h
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "joinable.h"
-
-namespace vespalib {
-
-/**
- * Interface used to abstract entities that are the source of
- * activity.
- **/
-struct Active : Joinable {
- /**
- * Start activity.
- **/
- virtual void start() = 0;
-
- /**
- * Request that activity stops. The returned object can be used to
- * wait for the actual conclusion of the activity.
- *
- * @return object that can be used to wait for activity completion
- **/
- virtual Joinable &stop() = 0;
-
- /**
- * Empty virtual destructor to enable subclassing.
- **/
- virtual ~Active() {}
-};
-
-} // namespace vespalib
-
diff --git a/vespalib/src/vespa/vespalib/util/joinable.cpp b/vespalib/src/vespa/vespalib/util/joinable.cpp
deleted file mode 100644
index 58112660389..00000000000
--- a/vespalib/src/vespa/vespalib/util/joinable.cpp
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "joinable.h"
-
-namespace vespalib {
-
-} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/joinable.h b/vespalib/src/vespa/vespalib/util/joinable.h
deleted file mode 100644
index 275ae740d26..00000000000
--- a/vespalib/src/vespa/vespalib/util/joinable.h
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-namespace vespalib {
-
-/**
- * Concurrent activity that we can wait for a conclusion of.
- **/
-struct Joinable {
- /**
- * Wait for the conclusion of this concurrent activity
- **/
- virtual void join() = 0;
-
- /**
- * Empty virtual destructor to enable subclassing.
- **/
- virtual ~Joinable() {}
-};
-
-} // namespace vespalib
-
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index 8a66a4f6898..becb5d2ab74 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -178,11 +178,11 @@ SimpleThreadBundle::run(Runnable* const* targets, size_t cnt)
}
SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Runnable::UP h)
- : thread(*this, std::move(init_fun)),
+ : thread(),
signal(s),
hook(std::move(h))
{
- thread.start();
+ thread = Thread::start(*this, std::move(init_fun));
}
void
diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index c2f83bbcf09..298226d8805 100644
--- a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -19,7 +19,8 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool
_mutex(),
_consumerCondition(),
_producerCondition(),
- _thread(*this, func),
+ _thread(),
+ _stopped(false),
_idleTracker(steady_clock::now()),
_threadIdleTracker(),
_wakeupCount(0),
@@ -37,13 +38,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool
if ( ! isQueueSizeHard) {
_overflow = std::make_unique<ArrayQueue<Task::UP>>();
}
- _thread.start();
+ _thread = Thread::start(*this, func);
}
SingleExecutor::~SingleExecutor() {
shutdown();
sync();
- _thread.stop();
+ stop();
_consumerCondition.notify_one();
_thread.join();
}
@@ -140,7 +141,7 @@ SingleExecutor::shutdown() {
void
SingleExecutor::run() {
- while (!_thread.stopped()) {
+ while (!stopped()) {
drain_tasks();
_producerCondition.notify_all();
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed);
diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.h b/vespalib/src/vespa/vespalib/util/singleexecutor.h
index dd755a76302..051f506e90a 100644
--- a/vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -9,6 +9,8 @@
#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
+#include <mutex>
+#include <condition_variable>
namespace vespalib {
@@ -38,6 +40,8 @@ private:
using Lock = std::unique_lock<std::mutex>;
void drain(Lock & lock);
void run() override;
+ void stop() { _stopped = true; }
+ bool stopped() const { return _stopped.load(std::memory_order_relaxed); }
void drain_tasks();
void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt);
void run_tasks_till(uint64_t available);
@@ -48,7 +52,6 @@ private:
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
-
uint64_t numTasks();
uint64_t numTasks(Lock & guard) const {
return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
@@ -68,6 +71,7 @@ private:
std::condition_variable _consumerCondition;
std::condition_variable _producerCondition;
vespalib::Thread _thread;
+ std::atomic<bool> _stopped;
ExecutorIdleTracker _idleTracker;
ThreadIdleTracker _threadIdleTracker;
uint64_t _wakeupCount;
diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp
index b6a491ee83d..cad24c5bcda 100644
--- a/vespalib/src/vespa/vespalib/util/thread.cpp
+++ b/vespalib/src/vespa/vespalib/util/thread.cpp
@@ -1,56 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "thread.h"
-#include "time.h"
-#include <thread>
-#include <cassert>
namespace vespalib {
-__thread Thread *Thread::_currentThread = nullptr;
-
-void
-Thread::run()
-{
- assert(_currentThread == nullptr);
- _currentThread = this;
- _start.await();
- if (!stopped()) {
- _init_fun(_runnable);
- }
- assert(_currentThread == this);
- _currentThread = nullptr;
-}
-
-Thread::Thread(Runnable &runnable, init_fun_t init_fun_in)
- : _runnable(runnable),
- _init_fun(std::move(init_fun_in)),
- _start(),
- _lock(),
- _cond(),
- _stopped(false),
- _woken(false),
- _thread(&Thread::run, this)
-{
-}
-
-Thread::~Thread()
-{
- stop().start();
-}
-
-void
-Thread::start()
-{
- _start.countDown();
-}
-
Thread &
-Thread::stop()
+Thread::operator=(Thread &&rhs) noexcept
{
- std::unique_lock guard(_lock);
- _stopped.store(true, std::memory_order_relaxed);
- _cond.notify_all();
+ // may call std::terminate
+ _thread = std::move(rhs._thread);
return *this;
}
@@ -62,32 +20,15 @@ Thread::join()
}
}
-bool
-Thread::slumber(double s)
-{
- std::unique_lock guard(_lock);
- if (!stopped() || _woken) {
- if (_cond.wait_for(guard, from_s(s)) == std::cv_status::no_timeout) {
- _woken = stopped();
- }
- } else {
- _woken = true;
- }
- return !stopped();
-}
-
-Thread &
-Thread::currentThread()
+Thread::~Thread()
{
- Thread *thread = _currentThread;
- assert(thread != nullptr);
- return *thread;
+ join();
}
-void
-Thread::sleep(size_t ms)
+Thread
+Thread::start(Runnable &runnable, Runnable::init_fun_t init_fun)
{
- std::this_thread::sleep_for(std::chrono::milliseconds(ms));
+ return start([&runnable, init_fun](){ init_fun(runnable); });
}
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h
index c03f1c6e65c..6db1c1d54b4 100644
--- a/vespalib/src/vespa/vespalib/util/thread.h
+++ b/vespalib/src/vespa/vespalib/util/thread.h
@@ -2,46 +2,42 @@
#pragma once
-#include "gate.h"
#include "runnable.h"
-#include "active.h"
-#include <atomic>
#include <thread>
+#include <concepts>
namespace vespalib {
/**
- * Abstraction of the concept of running a single thread.
+ * Thin thread abstraction that takes some things from std::thread
+ * (not allowed to assign to a running thread), some things from
+ * std::jthread (destructor does automatic join) and some things from
+ * now deprecated thread pools (the join function can be called
+ * multiple times and will only join the underlying thread if it is
+ * joinable). Enables starting a thread either by using a runnable and
+ * an init function or by forwarding directly to the std::thread
+ * constructor. Note that this class does not handle cancellation.
**/
-class Thread : public Active
+class Thread
{
private:
- using init_fun_t = Runnable::init_fun_t;
- static __thread Thread *_currentThread;
-
- Runnable &_runnable;
- init_fun_t _init_fun;
- vespalib::Gate _start;
- std::mutex _lock;
- std::condition_variable _cond;
- std::atomic<bool> _stopped;
- bool _woken;
- std::jthread _thread;
-
- void run();
-
+ std::thread _thread;
+ Thread(std::thread &&thread) noexcept : _thread(std::move(thread)) {}
public:
- Thread(Runnable &runnable, init_fun_t init_fun_in);
- ~Thread() override;
- void start() override;
- Thread &stop() override;
- void join() override;
- [[nodiscard]] bool stopped() const noexcept {
- return _stopped.load(std::memory_order_relaxed);
- }
- bool slumber(double s);
- static Thread &currentThread();
- static void sleep(size_t ms);
+ Thread() noexcept : _thread() {}
+ Thread(const Thread &rhs) = delete;
+ Thread(Thread &&rhs) noexcept : Thread(std::move(rhs._thread)) {}
+ std::thread::id get_id() const noexcept { return _thread.get_id(); }
+ Thread &operator=(const Thread &rhs) = delete;
+ Thread &operator=(Thread &&rhs) noexcept;
+ void join();
+ ~Thread();
+ [[nodiscard]] static Thread start(Runnable &runnable, Runnable::init_fun_t init_fun_t);
+ template<typename F, typename... Args>
+ requires std::invocable<F,Args...>
+ [[nodiscard]] static Thread start(F &&f, Args && ... args) {
+ return Thread(std::thread(std::forward<F>(f), std::forward<Args>(args)...));
+ };
};
} // namespace vespalib