summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-02-24 10:50:40 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-02-24 11:46:32 +0000
commitd21091be1f4f122aa66263b0e3907b4d3297d894 (patch)
treeadd4eda702345510ff8e6f857763679f08d5d729 /messagebus
parentea34220e4bc8a9d9bc598cab098e2af893688e4a (diff)
untangle messagebus from fastos
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/CMakeLists.txt1
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp18
-rw-r--r--messagebus/src/vespa/messagebus/messenger.h23
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/testlib/slobrok.cpp71
-rw-r--r--messagebus/src/vespa/messagebus/testlib/slobrok.h15
6 files changed, 37 insertions, 92 deletions
diff --git a/messagebus/CMakeLists.txt b/messagebus/CMakeLists.txt
index 30e795cac1d..ab37173a5ea 100644
--- a/messagebus/CMakeLists.txt
+++ b/messagebus/CMakeLists.txt
@@ -1,7 +1,6 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_define_module(
DEPENDS
- fastos
vespalog
config_cloudconfig
vespalib
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 056be51609f..20b39dae522 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -157,10 +157,11 @@ namespace mbus {
Messenger::Messenger()
: _lock(),
- _pool(),
+ _cond(),
_children(),
_queue(),
- _closed(false)
+ _closed(false),
+ _thread()
{}
Messenger::~Messenger()
@@ -170,8 +171,9 @@ Messenger::~Messenger()
_closed = true;
}
_cond.notify_all();
-
- _pool.Close();
+ if (_thread.joinable()) {
+ _thread.join();
+ }
std::for_each(_children.begin(), _children.end(), DeleteFunctor<ITask>());
if ( ! _queue.empty()) {
LOG(warning,
@@ -185,10 +187,8 @@ Messenger::~Messenger()
}
void
-Messenger::Run(FastOS_ThreadInterface *thread, void *arg)
+Messenger::run()
{
- (void)thread;
- (void)arg;
while (true) {
ITask::UP task;
{
@@ -235,9 +235,7 @@ Messenger::discardRecurrentTasks()
bool
Messenger::start()
{
- if (_pool.NewThread(this) == nullptr) {
- return false;
- }
+ _thread = std::thread([this](){run();});
return true;
}
diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h
index 6ec42ed9c4c..86cd4bf3b7f 100644
--- a/messagebus/src/vespa/messagebus/messenger.h
+++ b/messagebus/src/vespa/messagebus/messenger.h
@@ -7,7 +7,8 @@
#include "reply.h"
#include <vespa/vespalib/util/executor.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
-#include <vespa/fastos/thread.h>
+#include <condition_variable>
+#include <mutex>
namespace mbus {
@@ -16,7 +17,7 @@ namespace mbus {
* tasks. Tasks are enqueued using the synchronized {@link #enqueue(Task)}
* method, and are run in the order they were enqueued.
*/
-class Messenger : public FastOS_Runnable {
+class Messenger {
public:
/**
* Defines the required interface for tasks to be posted to this worker.
@@ -39,15 +40,15 @@ public:
};
private:
- mutable std::mutex _lock;
- std::condition_variable _cond;
- FastOS_ThreadPool _pool;
- std::vector<ITask*> _children;
- vespalib::ArrayQueue<ITask*> _queue;
- bool _closed;
-
+ mutable std::mutex _lock;
+ std::condition_variable _cond;
+ std::vector<ITask*> _children;
+ vespalib::ArrayQueue<ITask*> _queue;
+ bool _closed;
+ std::thread _thread;
+
protected:
- void Run(FastOS_ThreadInterface *thread, void *arg) override;
+ void run();
public:
Messenger();
@@ -55,7 +56,7 @@ public:
/**
* Frees any allocated resources. Also destroys all queued tasks.
*/
- ~Messenger() override;
+ ~Messenger();
/**
* Adds a recurrent task to this that is to be run for every iteration of
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 3a13534220f..8ab9aa13394 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -18,7 +18,6 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fastos/thread.h>
#include <thread>
#include <vespa/log/log.h>
diff --git a/messagebus/src/vespa/messagebus/testlib/slobrok.cpp b/messagebus/src/vespa/messagebus/testlib/slobrok.cpp
index 889daf538a3..b4cd2c846aa 100644
--- a/messagebus/src/vespa/messagebus/testlib/slobrok.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/slobrok.cpp
@@ -9,80 +9,37 @@
#include <vespa/log/log.h>
LOG_SETUP(".slobrok");
-namespace {
-class WaitTask : public FNET_Task
-{
-private:
- bool _done;
- std::mutex _mon;
- std::condition_variable _cond;
-public:
- explicit WaitTask(FNET_Scheduler *s) : FNET_Task(s), _done(false), _mon() {}
- ~WaitTask() override;
- void wait() {
- std::unique_lock guard(_mon);
- while (!_done) {
- _cond.wait(guard);
- }
- }
-
- void PerformTask() override {
- std::lock_guard guard(_mon);
- _done = true;
- _cond.notify_one();
- }
-};
-
-WaitTask::~WaitTask() = default;
-} // namespace <unnamed>
-
namespace mbus {
void
-Slobrok::Thread::setEnv(slobrok::SBEnv *env)
-{
- _env = env;
-}
-
-void
-Slobrok::Thread::Run(FastOS_ThreadInterface *, void *)
-{
- if (_env->MainLoop() != 0) {
- LOG_ABORT("Slobrok main failed");
- }
-}
-
-void
Slobrok::init()
{
slobrok::ConfigShim shim(_port);
_env = std::make_unique<slobrok::SBEnv>(shim);
- _thread.setEnv(_env.get());
- WaitTask wt(_env->getTransport()->GetScheduler());
- wt.ScheduleNow();
- if (_pool.NewThread(&_thread, nullptr) == nullptr) {
- LOG_ABORT("Could not spawn thread");
- }
- wt.wait();
+ _thread = std::thread([env = _env.get()]()
+ {
+ if (env->MainLoop() != 0) {
+ LOG_ABORT("Slobrok main failed");
+ }
+ });
+ _env->getTransport()->sync();
int p = _env->getSupervisor()->GetListenPort();
LOG_ASSERT(p != 0 && (p == _port || _port == 0));
_port = p;
}
Slobrok::Slobrok()
- : _pool(),
- _env(),
- _port(0),
- _thread()
+ : _env(),
+ _port(0),
+ _thread()
{
init();
}
Slobrok::Slobrok(int p)
- : _pool(),
- _env(),
- _port(p),
- _thread()
+ : _env(),
+ _port(p),
+ _thread()
{
init();
}
@@ -90,7 +47,7 @@ Slobrok::Slobrok(int p)
Slobrok::~Slobrok()
{
_env->getTransport()->ShutDown(true);
- _pool.Close();
+ _thread.join();
}
int
diff --git a/messagebus/src/vespa/messagebus/testlib/slobrok.h b/messagebus/src/vespa/messagebus/testlib/slobrok.h
index 7810222f07d..ff8e9cbb89a 100644
--- a/messagebus/src/vespa/messagebus/testlib/slobrok.h
+++ b/messagebus/src/vespa/messagebus/testlib/slobrok.h
@@ -4,7 +4,7 @@
#include <vespa/messagebus/common.h>
#include <vespa/slobrok/cfg.h>
-#include <vespa/fastos/thread.h>
+#include <thread>
namespace slobrok {
class SBEnv;
@@ -15,17 +15,9 @@ namespace mbus {
class Slobrok
{
private:
- class Thread : public FastOS_Runnable {
- private:
- slobrok::SBEnv *_env;
- public:
- void setEnv(slobrok::SBEnv *env);
- void Run(FastOS_ThreadInterface *, void *) override;
- };
- FastOS_ThreadPool _pool;
std::unique_ptr<slobrok::SBEnv> _env;
- int _port;
- Thread _thread;
+ int _port;
+ std::thread _thread;
Slobrok(const Slobrok &);
Slobrok &operator=(const Slobrok &);
@@ -42,4 +34,3 @@ public:
};
} // namespace mbus
-