aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-06-09 09:56:09 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-06-09 09:56:09 +0000
commit1ec4141d033113a5da90f1d28c48add4ee3e3755 (patch)
treeedc6e3dd9c5c0e9441697e51993e36371775706f
parenteae78b80d5d94e4cff9eb112637be20eb998712d (diff)
added AutoJoiner tool locally (might be moved to vespalib)
- make sure detached threads are done before exiting the application - co-operative auto-joining to limit the number of unjoined threads
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_server.cpp78
1 files changed, 77 insertions, 1 deletions
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
index 5c21f73da7f..c0504f49c2b 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
@@ -7,10 +7,85 @@
#include <vespa/vespalib/util/signalhandler.h>
#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <future>
#include <vespa/log/log.h>
LOG_SETUP("rpc_callback_server");
+/**
+ * Class keeping track of 'detached' threads in order to wait for
+ * their completion on program shutdown. Threads are not actually
+ * detached, but perform co-operative auto-joining on completion.
+ **/
+class AutoJoiner
+{
+private:
+ std::mutex _lock;
+ std::condition_variable _cond;
+ bool _closed;
+ size_t _pending;
+ std::thread _thread;
+ struct JoinGuard {
+ std::thread thread;
+ ~JoinGuard() {
+ if (thread.joinable()) {
+ assert(std::this_thread::get_id() != thread.get_id());
+ thread.join();
+ }
+ }
+ };
+ void notify_start() {
+ std::lock_guard guard(_lock);
+ if (!_closed) {
+ ++_pending;
+ } else {
+ throw std::runtime_error("no new threads allowed");
+ }
+ }
+ void notify_done(std::thread thread) {
+ JoinGuard join;
+ std::unique_lock guard(_lock);
+ join.thread = std::move(_thread);
+ _thread = std::move(thread);
+ if (--_pending == 0 && _closed) {
+ _cond.notify_all();
+ }
+ }
+ auto wrap_task(auto task, std::promise<std::thread> &promise) {
+ return [future = promise.get_future(), task = std::move(task), &owner = *this]() mutable
+ {
+ auto thread = future.get();
+ assert(std::this_thread::get_id() == thread.get_id());
+ task();
+ owner.notify_done(std::move(thread));
+ };
+ }
+public:
+ AutoJoiner() : _lock(), _cond(), _closed(false), _pending(0), _thread() {}
+ ~AutoJoiner() { close_and_wait(); }
+ void start(auto task) {
+ notify_start();
+ std::promise<std::thread> promise;
+ promise.set_value(std::thread(wrap_task(std::move(task), promise)));
+ };
+ void close_and_wait() {
+ JoinGuard join;
+ std::unique_lock guard(_lock);
+ _closed = true;
+ while (_pending > 0) {
+ _cond.wait(guard);
+ }
+ std::swap(join.thread, _thread);
+ }
+};
+
+AutoJoiner &auto_joiner() {
+ static AutoJoiner obj;
+ return obj;
+}
+
struct RPC : public FRT_Invokable
{
void CallBack(FRT_RPCRequest *req);
@@ -35,7 +110,7 @@ void
RPC::CallBack(FRT_RPCRequest *req)
{
req->Detach();
- std::thread(do_callback, req).detach();
+ auto_joiner().start([req]{ do_callback(req); });
}
void
@@ -53,6 +128,7 @@ class MyApp
{
public:
int main(int argc, char **argv);
+ ~MyApp() { auto_joiner().close_and_wait(); }
};
int