diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2023-02-14 10:15:12 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2023-02-14 15:40:46 +0000 |
commit | 98e408c667e3dcda3dc177594ffa4c4d0d99a33e (patch) | |
tree | 8086fedd3c4c11ac51fa86f12f68cfe61095ee55 /vbench | |
parent | 251bd6d0b21aa2ddac8ef82338f40d37ffc4990f (diff) |
stop using fastos thread more places
- also stop using std::jthread
- remove Active and Joinable interfaces
- remove stop, stopped and slumber
- remove currentThread
- make start function static
- override start for Runnable w/init or custom function
- explicit stop/slumber where needed
Diffstat (limited to 'vbench')
-rw-r--r-- | vbench/src/apps/vbench/vbench.cpp | 3 | ||||
-rw-r--r-- | vbench/src/tests/dispatcher/dispatcher_test.cpp | 6 | ||||
-rw-r--r-- | vbench/src/tests/handler_thread/handler_thread_test.cpp | 3 | ||||
-rw-r--r-- | vbench/src/tests/timer/timer_test.cpp | 3 | ||||
-rw-r--r-- | vbench/src/vbench/core/dispatcher.hpp | 3 | ||||
-rw-r--r-- | vbench/src/vbench/core/handler_thread.h | 7 | ||||
-rw-r--r-- | vbench/src/vbench/core/handler_thread.hpp | 4 | ||||
-rw-r--r-- | vbench/src/vbench/vbench/request_scheduler.cpp | 24 | ||||
-rw-r--r-- | vbench/src/vbench/vbench/request_scheduler.h | 17 | ||||
-rw-r--r-- | vbench/src/vbench/vbench/vbench.cpp | 5 | ||||
-rw-r--r-- | vbench/src/vbench/vbench/vbench.h | 2 | ||||
-rw-r--r-- | vbench/src/vbench/vbench/worker.cpp | 4 | ||||
-rw-r--r-- | vbench/src/vbench/vbench/worker.h | 6 |
13 files changed, 49 insertions, 38 deletions
diff --git a/vbench/src/apps/vbench/vbench.cpp b/vbench/src/apps/vbench/vbench.cpp index ffea64c6034..b5c2897207f 100644 --- a/vbench/src/apps/vbench/vbench.cpp +++ b/vbench/src/apps/vbench/vbench.cpp @@ -44,8 +44,7 @@ int run(const std::string &cfg_name) { VBench vbench(cfg); NotifyDone notify(done); vespalib::RunnablePair runBoth(vbench, notify); - vespalib::Thread thread(runBoth, vbench_thread); - thread.start(); + auto thread = vespalib::Thread::start(runBoth, vbench_thread); while (!SIG::INT.check() && !SIG::TERM.check() && !done.await(1s)) {} if (!done.await(vespalib::duration::zero())) { vbench.abort(); diff --git a/vbench/src/tests/dispatcher/dispatcher_test.cpp b/vbench/src/tests/dispatcher/dispatcher_test.cpp index 85879feb0ee..6a3fb8d0c7c 100644 --- a/vbench/src/tests/dispatcher/dispatcher_test.cpp +++ b/vbench/src/tests/dispatcher/dispatcher_test.cpp @@ -30,11 +30,9 @@ TEST("dispatcher") { Dispatcher<int> dispatcher(dropped); Fetcher fetcher1(dispatcher, handler1); Fetcher fetcher2(dispatcher, handler2); - vespalib::Thread thread1(fetcher1, fetcher1_thread); - vespalib::Thread thread2(fetcher2, fetcher2_thread); - thread1.start(); + auto thread1 = vespalib::Thread::start(fetcher1, fetcher1_thread); EXPECT_TRUE(dispatcher.waitForThreads(1, 512)); - thread2.start(); + auto thread2 = vespalib::Thread::start(fetcher2, fetcher2_thread); EXPECT_TRUE(dispatcher.waitForThreads(2, 512)); EXPECT_EQUAL(-1, dropped.value); EXPECT_EQUAL(-1, handler1.value); diff --git a/vbench/src/tests/handler_thread/handler_thread_test.cpp b/vbench/src/tests/handler_thread/handler_thread_test.cpp index 97a12e82ac8..497b7db2883 100644 --- a/vbench/src/tests/handler_thread/handler_thread_test.cpp +++ b/vbench/src/tests/handler_thread/handler_thread_test.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/testapp.h> #include <vbench/test/all.h> +#include <vespa/vespalib/util/time.h> using namespace vbench; @@ -9,7 +10,7 @@ struct MyHandler : Handler<int> { ~MyHandler() override; void handle(std::unique_ptr<int> value) override { values.push_back(*value); - vespalib::Thread::sleep(10); // for improved coverage + std::this_thread::sleep_for(10ms); } }; diff --git a/vbench/src/tests/timer/timer_test.cpp b/vbench/src/tests/timer/timer_test.cpp index 43e7b76caa0..eda0564d2d8 100644 --- a/vbench/src/tests/timer/timer_test.cpp +++ b/vbench/src/tests/timer/timer_test.cpp @@ -1,13 +1,14 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/testapp.h> #include <vbench/test/all.h> +#include <vespa/vespalib/util/time.h> using namespace vbench; IGNORE_TEST("timer") { Timer timer; EXPECT_APPROX(0.0, timer.sample(), 0.1); - vespalib::Thread::sleep(1000); + std::this_thread::sleep_for(1000ms); EXPECT_APPROX(1.0, timer.sample(), 0.1); timer.reset(); EXPECT_APPROX(0.0, timer.sample(), 0.1); diff --git a/vbench/src/vbench/core/dispatcher.hpp b/vbench/src/vbench/core/dispatcher.hpp index aa5afcd6d67..572e9f3381d 100644 --- a/vbench/src/vbench/core/dispatcher.hpp +++ b/vbench/src/vbench/core/dispatcher.hpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/util/thread.h> +#include <vespa/vespalib/util/time.h> namespace vbench { @@ -22,7 +23,7 @@ Dispatcher<T>::waitForThreads(size_t threads, size_t pollCnt) const { for (size_t i = 0; i < pollCnt; ++i) { if (i != 0) { - vespalib::Thread::sleep(20); + std::this_thread::sleep_for(20ms); } { std::lock_guard guard(_lock); diff --git a/vbench/src/vbench/core/handler_thread.h b/vbench/src/vbench/core/handler_thread.h index 402ecbeb0dc..8ece1389dfc 100644 --- a/vbench/src/vbench/core/handler_thread.h +++ b/vbench/src/vbench/core/handler_thread.h @@ -6,7 +6,7 @@ #include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/util/runnable.h> -#include <vespa/vespalib/util/joinable.h> +#include <condition_variable> namespace vbench { @@ -19,8 +19,7 @@ namespace vbench { **/ template <typename T> class HandlerThread : public Handler<T>, - public vespalib::Runnable, - public vespalib::Joinable + public vespalib::Runnable { private: std::mutex _lock; @@ -36,7 +35,7 @@ public: HandlerThread(Handler<T> &next, init_fun_t init_fun); ~HandlerThread(); void handle(std::unique_ptr<T> obj) override; - void join() override; + void join(); }; } // namespace vbench diff --git a/vbench/src/vbench/core/handler_thread.hpp b/vbench/src/vbench/core/handler_thread.hpp index 56cc0a7771d..1a99861ea81 100644 --- a/vbench/src/vbench/core/handler_thread.hpp +++ b/vbench/src/vbench/core/handler_thread.hpp @@ -28,10 +28,10 @@ HandlerThread<T>::HandlerThread(Handler<T> &next, init_fun_t init_fun) _cond(), _queue(), _next(next), - _thread(*this, init_fun), + _thread(), _done(false) { - _thread.start(); + _thread = vespalib::Thread::start(*this, init_fun); } template <typename T> diff --git a/vbench/src/vbench/vbench/request_scheduler.cpp b/vbench/src/vbench/vbench/request_scheduler.cpp index 95d29181b1f..cde31ec07b8 100644 --- a/vbench/src/vbench/vbench/request_scheduler.cpp +++ b/vbench/src/vbench/vbench/request_scheduler.cpp @@ -2,6 +2,7 @@ #include "request_scheduler.h" #include <vbench/core/timer.h> +#include <vespa/vespalib/util/time.h> namespace vbench { @@ -13,14 +14,18 @@ RequestScheduler::run() { double sleepTime; std::vector<Request::UP> list; - vespalib::Thread &thread = vespalib::Thread::currentThread(); while (_queue.extract(_timer.sample(), list, sleepTime)) { for (size_t i = 0; i < list.size(); ++i) { Request::UP request = Request::UP(list[i].release()); _dispatcher.handle(std::move(request)); } list.clear(); - thread.slumber(sleepTime); + { + auto guard = std::unique_lock(_lock); + if (_may_slumber) { + _cond.wait_for(guard, std::chrono::duration<double,std::milli>(sleepTime)); + } + } } } @@ -30,9 +35,12 @@ RequestScheduler::RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &ne _queue(10.0, 0.020), _droppedTagger(_proxy), _dispatcher(_droppedTagger), - _thread(*this, vbench_request_scheduler_thread), + _thread(), _connectionPool(std::move(crypto), _timer), - _workers() + _workers(), + _lock(), + _cond(), + _may_slumber(true) { for (size_t i = 0; i < numWorkers; ++i) { _workers.push_back(std::make_unique<Worker>(_dispatcher, _proxy, _connectionPool, _timer)); @@ -45,7 +53,11 @@ RequestScheduler::abort() { _queue.close(); _queue.discard(); - _thread.stop(); + { + auto guard = std::lock_guard(_lock); + _may_slumber = false; + _cond.notify_all(); + } } void @@ -59,7 +71,7 @@ void RequestScheduler::start() { _timer.reset(); - _thread.start(); + _thread = vespalib::Thread::start(*this, vbench_request_scheduler_thread); } RequestScheduler & diff --git a/vbench/src/vbench/vbench/request_scheduler.h b/vbench/src/vbench/vbench/request_scheduler.h index a0fb1eda4e7..b1d525eb691 100644 --- a/vbench/src/vbench/vbench/request_scheduler.h +++ b/vbench/src/vbench/vbench/request_scheduler.h @@ -7,7 +7,8 @@ #include <vbench/core/time_queue.h> #include <vbench/core/dispatcher.h> #include <vbench/core/handler_thread.h> -#include <vespa/vespalib/util/active.h> +#include <mutex> +#include <condition_variable> namespace vbench { @@ -17,8 +18,7 @@ namespace vbench { * with. **/ class RequestScheduler : public Handler<Request>, - public vespalib::Runnable, - public vespalib::Active + public vespalib::Runnable { private: Timer _timer; @@ -29,7 +29,10 @@ private: vespalib::Thread _thread; HttpConnectionPool _connectionPool; std::vector<Worker::UP> _workers; - + std::mutex _lock; + std::condition_variable _cond; + bool _may_slumber; + void run() override; public: using UP = std::unique_ptr<RequestScheduler>; @@ -37,9 +40,9 @@ public: RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &next, size_t numWorkers); void abort(); void handle(Request::UP request) override; - void start() override; - RequestScheduler &stop() override; - void join() override; + void start(); + RequestScheduler &stop(); + void join(); }; } // namespace vbench diff --git a/vbench/src/vbench/vbench/vbench.cpp b/vbench/src/vbench/vbench/vbench.cpp index 9a5adad262e..8a8bceccefb 100644 --- a/vbench/src/vbench/vbench/vbench.cpp +++ b/vbench/src/vbench/vbench/vbench.cpp @@ -78,7 +78,6 @@ VBench::VBench(const vespalib::Slime &cfg) } inputChain->generator = _factory.createGenerator(generator, *inputChain->taggers.back()); if (inputChain->generator.get() != 0) { - inputChain->thread.reset(new vespalib::Thread(*inputChain->generator, vbench_inputchain_generator)); _inputs.push_back(std::move(inputChain)); } } @@ -101,10 +100,10 @@ VBench::run() { _scheduler->start(); for (size_t i = 0; i < _inputs.size(); ++i) { - _inputs[i]->thread->start(); + _inputs[i]->thread = vespalib::Thread::start(*_inputs[i]->generator, vbench_inputchain_generator); } for (size_t i = 0; i < _inputs.size(); ++i) { - _inputs[i]->thread->join(); + _inputs[i]->thread.join(); } _scheduler->stop().join(); for (size_t i = 0; i < _inputs.size(); ++i) { diff --git a/vbench/src/vbench/vbench/vbench.h b/vbench/src/vbench/vbench/vbench.h index dbb46e72800..2b7fcf0cd88 100644 --- a/vbench/src/vbench/vbench/vbench.h +++ b/vbench/src/vbench/vbench/vbench.h @@ -26,7 +26,7 @@ private: using UP = std::unique_ptr<InputChain>; std::vector<Tagger::UP> taggers; Generator::UP generator; - std::unique_ptr<vespalib::Thread> thread; + vespalib::Thread thread; }; NativeFactory _factory; std::vector<Analyzer::UP> _analyzers; diff --git a/vbench/src/vbench/vbench/worker.cpp b/vbench/src/vbench/vbench/worker.cpp index afccc7de39f..eabd17ae73f 100644 --- a/vbench/src/vbench/vbench/worker.cpp +++ b/vbench/src/vbench/vbench/worker.cpp @@ -24,13 +24,13 @@ Worker::run() Worker::Worker(Provider<Request> &provider, Handler<Request> &next, HttpConnectionPool &pool, Timer &timer) - : _thread(*this, vbench_worker_thread), + : _thread(), _provider(provider), _next(next), _pool(pool), _timer(timer) { - _thread.start(); + _thread = vespalib::Thread::start(*this, vbench_worker_thread); } } // namespace vbench diff --git a/vbench/src/vbench/vbench/worker.h b/vbench/src/vbench/vbench/worker.h index 6594a4f3dd6..d2bcfae637b 100644 --- a/vbench/src/vbench/vbench/worker.h +++ b/vbench/src/vbench/vbench/worker.h @@ -8,7 +8,6 @@ #include <vbench/http/http_connection_pool.h> #include <vespa/vespalib/util/runnable.h> #include <vespa/vespalib/util/thread.h> -#include <vespa/vespalib/util/joinable.h> namespace vbench { @@ -18,8 +17,7 @@ namespace vbench { * internal thread that will stop when the request provider starts * handing out empty requests. **/ -class Worker : public vespalib::Runnable, - public vespalib::Joinable +class Worker : public vespalib::Runnable { private: vespalib::Thread _thread; @@ -33,7 +31,7 @@ public: using UP = std::unique_ptr<Worker>; Worker(Provider<Request> &provider, Handler<Request> &next, HttpConnectionPool &pool, Timer &timer); - void join() override { _thread.join(); } + void join() { _thread.join(); } }; } // namespace vbench |