diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2023-02-24 10:50:40 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2023-02-24 11:46:32 +0000 |
commit | d21091be1f4f122aa66263b0e3907b4d3297d894 (patch) | |
tree | add4eda702345510ff8e6f857763679f08d5d729 /messagebus | |
parent | ea34220e4bc8a9d9bc598cab098e2af893688e4a (diff) |
untangle messagebus from fastos
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/CMakeLists.txt | 1 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/messenger.cpp | 18 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/messenger.h | 23 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 1 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/testlib/slobrok.cpp | 71 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/testlib/slobrok.h | 15 |
6 files changed, 37 insertions, 92 deletions
diff --git a/messagebus/CMakeLists.txt b/messagebus/CMakeLists.txt index 30e795cac1d..ab37173a5ea 100644 --- a/messagebus/CMakeLists.txt +++ b/messagebus/CMakeLists.txt @@ -1,7 +1,6 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_define_module( DEPENDS - fastos vespalog config_cloudconfig vespalib diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 056be51609f..20b39dae522 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -157,10 +157,11 @@ namespace mbus { Messenger::Messenger() : _lock(), - _pool(), + _cond(), _children(), _queue(), - _closed(false) + _closed(false), + _thread() {} Messenger::~Messenger() @@ -170,8 +171,9 @@ Messenger::~Messenger() _closed = true; } _cond.notify_all(); - - _pool.Close(); + if (_thread.joinable()) { + _thread.join(); + } std::for_each(_children.begin(), _children.end(), DeleteFunctor<ITask>()); if ( ! _queue.empty()) { LOG(warning, @@ -185,10 +187,8 @@ Messenger::~Messenger() } void -Messenger::Run(FastOS_ThreadInterface *thread, void *arg) +Messenger::run() { - (void)thread; - (void)arg; while (true) { ITask::UP task; { @@ -235,9 +235,7 @@ Messenger::discardRecurrentTasks() bool Messenger::start() { - if (_pool.NewThread(this) == nullptr) { - return false; - } + _thread = std::thread([this](){run();}); return true; } diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h index 6ec42ed9c4c..86cd4bf3b7f 100644 --- a/messagebus/src/vespa/messagebus/messenger.h +++ b/messagebus/src/vespa/messagebus/messenger.h @@ -7,7 +7,8 @@ #include "reply.h" #include <vespa/vespalib/util/executor.h> #include <vespa/vespalib/util/arrayqueue.hpp> -#include <vespa/fastos/thread.h> +#include <condition_variable> +#include <mutex> namespace mbus { @@ -16,7 +17,7 @@ namespace mbus { * tasks. Tasks are enqueued using the synchronized {@link #enqueue(Task)} * method, and are run in the order they were enqueued. */ -class Messenger : public FastOS_Runnable { +class Messenger { public: /** * Defines the required interface for tasks to be posted to this worker. @@ -39,15 +40,15 @@ public: }; private: - mutable std::mutex _lock; - std::condition_variable _cond; - FastOS_ThreadPool _pool; - std::vector<ITask*> _children; - vespalib::ArrayQueue<ITask*> _queue; - bool _closed; - + mutable std::mutex _lock; + std::condition_variable _cond; + std::vector<ITask*> _children; + vespalib::ArrayQueue<ITask*> _queue; + bool _closed; + std::thread _thread; + protected: - void Run(FastOS_ThreadInterface *thread, void *arg) override; + void run(); public: Messenger(); @@ -55,7 +56,7 @@ public: /** * Frees any allocated resources. Also destroys all queued tasks. */ - ~Messenger() override; + ~Messenger(); /** * Adds a recurrent task to this that is to be run for every iteration of diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 3a13534220f..8ab9aa13394 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -18,7 +18,6 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/fastos/thread.h> #include <thread> #include <vespa/log/log.h> diff --git a/messagebus/src/vespa/messagebus/testlib/slobrok.cpp b/messagebus/src/vespa/messagebus/testlib/slobrok.cpp index 889daf538a3..b4cd2c846aa 100644 --- a/messagebus/src/vespa/messagebus/testlib/slobrok.cpp +++ b/messagebus/src/vespa/messagebus/testlib/slobrok.cpp @@ -9,80 +9,37 @@ #include <vespa/log/log.h> LOG_SETUP(".slobrok"); -namespace { -class WaitTask : public FNET_Task -{ -private: - bool _done; - std::mutex _mon; - std::condition_variable _cond; -public: - explicit WaitTask(FNET_Scheduler *s) : FNET_Task(s), _done(false), _mon() {} - ~WaitTask() override; - void wait() { - std::unique_lock guard(_mon); - while (!_done) { - _cond.wait(guard); - } - } - - void PerformTask() override { - std::lock_guard guard(_mon); - _done = true; - _cond.notify_one(); - } -}; - -WaitTask::~WaitTask() = default; -} // namespace <unnamed> - namespace mbus { void -Slobrok::Thread::setEnv(slobrok::SBEnv *env) -{ - _env = env; -} - -void -Slobrok::Thread::Run(FastOS_ThreadInterface *, void *) -{ - if (_env->MainLoop() != 0) { - LOG_ABORT("Slobrok main failed"); - } -} - -void Slobrok::init() { slobrok::ConfigShim shim(_port); _env = std::make_unique<slobrok::SBEnv>(shim); - _thread.setEnv(_env.get()); - WaitTask wt(_env->getTransport()->GetScheduler()); - wt.ScheduleNow(); - if (_pool.NewThread(&_thread, nullptr) == nullptr) { - LOG_ABORT("Could not spawn thread"); - } - wt.wait(); + _thread = std::thread([env = _env.get()]() + { + if (env->MainLoop() != 0) { + LOG_ABORT("Slobrok main failed"); + } + }); + _env->getTransport()->sync(); int p = _env->getSupervisor()->GetListenPort(); LOG_ASSERT(p != 0 && (p == _port || _port == 0)); _port = p; } Slobrok::Slobrok() - : _pool(), - _env(), - _port(0), - _thread() + : _env(), + _port(0), + _thread() { init(); } Slobrok::Slobrok(int p) - : _pool(), - _env(), - _port(p), - _thread() + : _env(), + _port(p), + _thread() { init(); } @@ -90,7 +47,7 @@ Slobrok::Slobrok(int p) Slobrok::~Slobrok() { _env->getTransport()->ShutDown(true); - _pool.Close(); + _thread.join(); } int diff --git a/messagebus/src/vespa/messagebus/testlib/slobrok.h b/messagebus/src/vespa/messagebus/testlib/slobrok.h index 7810222f07d..ff8e9cbb89a 100644 --- a/messagebus/src/vespa/messagebus/testlib/slobrok.h +++ b/messagebus/src/vespa/messagebus/testlib/slobrok.h @@ -4,7 +4,7 @@ #include <vespa/messagebus/common.h> #include <vespa/slobrok/cfg.h> -#include <vespa/fastos/thread.h> +#include <thread> namespace slobrok { class SBEnv; @@ -15,17 +15,9 @@ namespace mbus { class Slobrok { private: - class Thread : public FastOS_Runnable { - private: - slobrok::SBEnv *_env; - public: - void setEnv(slobrok::SBEnv *env); - void Run(FastOS_ThreadInterface *, void *) override; - }; - FastOS_ThreadPool _pool; std::unique_ptr<slobrok::SBEnv> _env; - int _port; - Thread _thread; + int _port; + std::thread _thread; Slobrok(const Slobrok &); Slobrok &operator=(const Slobrok &); @@ -42,4 +34,3 @@ public: }; } // namespace mbus - |