diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2023-06-09 09:56:09 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2023-06-09 09:56:09 +0000 |
commit | 1ec4141d033113a5da90f1d28c48add4ee3e3755 (patch) | |
tree | edc6e3dd9c5c0e9441697e51993e36371775706f /fnet/src/examples | |
parent | eae78b80d5d94e4cff9eb112637be20eb998712d (diff) |
added AutoJoiner tool locally (might be moved to vespalib)
- make sure detached threads are done before exiting the application
- co-operative auto-joining to limit the number of unjoined threads
Diffstat (limited to 'fnet/src/examples')
-rw-r--r-- | fnet/src/examples/frt/rpc/rpc_callback_server.cpp | 78 |
1 files changed, 77 insertions, 1 deletions
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp index 5c21f73da7f..c0504f49c2b 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp @@ -7,10 +7,85 @@ #include <vespa/vespalib/util/signalhandler.h> #include <thread> +#include <mutex> +#include <condition_variable> +#include <future> #include <vespa/log/log.h> LOG_SETUP("rpc_callback_server"); +/** + * Class keeping track of 'detached' threads in order to wait for + * their completion on program shutdown. Threads are not actually + * detached, but perform co-operative auto-joining on completion. + **/ +class AutoJoiner +{ +private: + std::mutex _lock; + std::condition_variable _cond; + bool _closed; + size_t _pending; + std::thread _thread; + struct JoinGuard { + std::thread thread; + ~JoinGuard() { + if (thread.joinable()) { + assert(std::this_thread::get_id() != thread.get_id()); + thread.join(); + } + } + }; + void notify_start() { + std::lock_guard guard(_lock); + if (!_closed) { + ++_pending; + } else { + throw std::runtime_error("no new threads allowed"); + } + } + void notify_done(std::thread thread) { + JoinGuard join; + std::unique_lock guard(_lock); + join.thread = std::move(_thread); + _thread = std::move(thread); + if (--_pending == 0 && _closed) { + _cond.notify_all(); + } + } + auto wrap_task(auto task, std::promise<std::thread> &promise) { + return [future = promise.get_future(), task = std::move(task), &owner = *this]() mutable + { + auto thread = future.get(); + assert(std::this_thread::get_id() == thread.get_id()); + task(); + owner.notify_done(std::move(thread)); + }; + } +public: + AutoJoiner() : _lock(), _cond(), _closed(false), _pending(0), _thread() {} + ~AutoJoiner() { close_and_wait(); } + void start(auto task) { + notify_start(); + std::promise<std::thread> promise; + promise.set_value(std::thread(wrap_task(std::move(task), promise))); + }; + void close_and_wait() { + JoinGuard join; + std::unique_lock guard(_lock); + _closed = true; + while (_pending > 0) { + _cond.wait(guard); + } + std::swap(join.thread, _thread); + } +}; + +AutoJoiner &auto_joiner() { + static AutoJoiner obj; + return obj; +} + struct RPC : public FRT_Invokable { void CallBack(FRT_RPCRequest *req); @@ -35,7 +110,7 @@ void RPC::CallBack(FRT_RPCRequest *req) { req->Detach(); - std::thread(do_callback, req).detach(); + auto_joiner().start([req]{ do_callback(req); }); } void @@ -53,6 +128,7 @@ class MyApp { public: int main(int argc, char **argv); + ~MyApp() { auto_joiner().close_and_wait(); } }; int |