summaryrefslogtreecommitdiffstats
path: root/vbench
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-02-14 10:15:12 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-02-14 15:40:46 +0000
commit98e408c667e3dcda3dc177594ffa4c4d0d99a33e (patch)
tree8086fedd3c4c11ac51fa86f12f68cfe61095ee55 /vbench
parent251bd6d0b21aa2ddac8ef82338f40d37ffc4990f (diff)
stop using fastos thread more places
- also stop using std::jthread - remove Active and Joinable interfaces - remove stop, stopped and slumber - remove currentThread - make start function static - override start for Runnable w/init or custom function - explicit stop/slumber where needed
Diffstat (limited to 'vbench')
-rw-r--r--vbench/src/apps/vbench/vbench.cpp3
-rw-r--r--vbench/src/tests/dispatcher/dispatcher_test.cpp6
-rw-r--r--vbench/src/tests/handler_thread/handler_thread_test.cpp3
-rw-r--r--vbench/src/tests/timer/timer_test.cpp3
-rw-r--r--vbench/src/vbench/core/dispatcher.hpp3
-rw-r--r--vbench/src/vbench/core/handler_thread.h7
-rw-r--r--vbench/src/vbench/core/handler_thread.hpp4
-rw-r--r--vbench/src/vbench/vbench/request_scheduler.cpp24
-rw-r--r--vbench/src/vbench/vbench/request_scheduler.h17
-rw-r--r--vbench/src/vbench/vbench/vbench.cpp5
-rw-r--r--vbench/src/vbench/vbench/vbench.h2
-rw-r--r--vbench/src/vbench/vbench/worker.cpp4
-rw-r--r--vbench/src/vbench/vbench/worker.h6
13 files changed, 49 insertions, 38 deletions
diff --git a/vbench/src/apps/vbench/vbench.cpp b/vbench/src/apps/vbench/vbench.cpp
index ffea64c6034..b5c2897207f 100644
--- a/vbench/src/apps/vbench/vbench.cpp
+++ b/vbench/src/apps/vbench/vbench.cpp
@@ -44,8 +44,7 @@ int run(const std::string &cfg_name) {
VBench vbench(cfg);
NotifyDone notify(done);
vespalib::RunnablePair runBoth(vbench, notify);
- vespalib::Thread thread(runBoth, vbench_thread);
- thread.start();
+ auto thread = vespalib::Thread::start(runBoth, vbench_thread);
while (!SIG::INT.check() && !SIG::TERM.check() && !done.await(1s)) {}
if (!done.await(vespalib::duration::zero())) {
vbench.abort();
diff --git a/vbench/src/tests/dispatcher/dispatcher_test.cpp b/vbench/src/tests/dispatcher/dispatcher_test.cpp
index 85879feb0ee..6a3fb8d0c7c 100644
--- a/vbench/src/tests/dispatcher/dispatcher_test.cpp
+++ b/vbench/src/tests/dispatcher/dispatcher_test.cpp
@@ -30,11 +30,9 @@ TEST("dispatcher") {
Dispatcher<int> dispatcher(dropped);
Fetcher fetcher1(dispatcher, handler1);
Fetcher fetcher2(dispatcher, handler2);
- vespalib::Thread thread1(fetcher1, fetcher1_thread);
- vespalib::Thread thread2(fetcher2, fetcher2_thread);
- thread1.start();
+ auto thread1 = vespalib::Thread::start(fetcher1, fetcher1_thread);
EXPECT_TRUE(dispatcher.waitForThreads(1, 512));
- thread2.start();
+ auto thread2 = vespalib::Thread::start(fetcher2, fetcher2_thread);
EXPECT_TRUE(dispatcher.waitForThreads(2, 512));
EXPECT_EQUAL(-1, dropped.value);
EXPECT_EQUAL(-1, handler1.value);
diff --git a/vbench/src/tests/handler_thread/handler_thread_test.cpp b/vbench/src/tests/handler_thread/handler_thread_test.cpp
index 97a12e82ac8..497b7db2883 100644
--- a/vbench/src/tests/handler_thread/handler_thread_test.cpp
+++ b/vbench/src/tests/handler_thread/handler_thread_test.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vbench/test/all.h>
+#include <vespa/vespalib/util/time.h>
using namespace vbench;
@@ -9,7 +10,7 @@ struct MyHandler : Handler<int> {
~MyHandler() override;
void handle(std::unique_ptr<int> value) override {
values.push_back(*value);
- vespalib::Thread::sleep(10); // for improved coverage
+ std::this_thread::sleep_for(10ms);
}
};
diff --git a/vbench/src/tests/timer/timer_test.cpp b/vbench/src/tests/timer/timer_test.cpp
index 43e7b76caa0..eda0564d2d8 100644
--- a/vbench/src/tests/timer/timer_test.cpp
+++ b/vbench/src/tests/timer/timer_test.cpp
@@ -1,13 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vbench/test/all.h>
+#include <vespa/vespalib/util/time.h>
using namespace vbench;
IGNORE_TEST("timer") {
Timer timer;
EXPECT_APPROX(0.0, timer.sample(), 0.1);
- vespalib::Thread::sleep(1000);
+ std::this_thread::sleep_for(1000ms);
EXPECT_APPROX(1.0, timer.sample(), 0.1);
timer.reset();
EXPECT_APPROX(0.0, timer.sample(), 0.1);
diff --git a/vbench/src/vbench/core/dispatcher.hpp b/vbench/src/vbench/core/dispatcher.hpp
index aa5afcd6d67..572e9f3381d 100644
--- a/vbench/src/vbench/core/dispatcher.hpp
+++ b/vbench/src/vbench/core/dispatcher.hpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/thread.h>
+#include <vespa/vespalib/util/time.h>
namespace vbench {
@@ -22,7 +23,7 @@ Dispatcher<T>::waitForThreads(size_t threads, size_t pollCnt) const
{
for (size_t i = 0; i < pollCnt; ++i) {
if (i != 0) {
- vespalib::Thread::sleep(20);
+ std::this_thread::sleep_for(20ms);
}
{
std::lock_guard guard(_lock);
diff --git a/vbench/src/vbench/core/handler_thread.h b/vbench/src/vbench/core/handler_thread.h
index 402ecbeb0dc..8ece1389dfc 100644
--- a/vbench/src/vbench/core/handler_thread.h
+++ b/vbench/src/vbench/core/handler_thread.h
@@ -6,7 +6,7 @@
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/runnable.h>
-#include <vespa/vespalib/util/joinable.h>
+#include <condition_variable>
namespace vbench {
@@ -19,8 +19,7 @@ namespace vbench {
**/
template <typename T>
class HandlerThread : public Handler<T>,
- public vespalib::Runnable,
- public vespalib::Joinable
+ public vespalib::Runnable
{
private:
std::mutex _lock;
@@ -36,7 +35,7 @@ public:
HandlerThread(Handler<T> &next, init_fun_t init_fun);
~HandlerThread();
void handle(std::unique_ptr<T> obj) override;
- void join() override;
+ void join();
};
} // namespace vbench
diff --git a/vbench/src/vbench/core/handler_thread.hpp b/vbench/src/vbench/core/handler_thread.hpp
index 56cc0a7771d..1a99861ea81 100644
--- a/vbench/src/vbench/core/handler_thread.hpp
+++ b/vbench/src/vbench/core/handler_thread.hpp
@@ -28,10 +28,10 @@ HandlerThread<T>::HandlerThread(Handler<T> &next, init_fun_t init_fun)
_cond(),
_queue(),
_next(next),
- _thread(*this, init_fun),
+ _thread(),
_done(false)
{
- _thread.start();
+ _thread = vespalib::Thread::start(*this, init_fun);
}
template <typename T>
diff --git a/vbench/src/vbench/vbench/request_scheduler.cpp b/vbench/src/vbench/vbench/request_scheduler.cpp
index 95d29181b1f..cde31ec07b8 100644
--- a/vbench/src/vbench/vbench/request_scheduler.cpp
+++ b/vbench/src/vbench/vbench/request_scheduler.cpp
@@ -2,6 +2,7 @@
#include "request_scheduler.h"
#include <vbench/core/timer.h>
+#include <vespa/vespalib/util/time.h>
namespace vbench {
@@ -13,14 +14,18 @@ RequestScheduler::run()
{
double sleepTime;
std::vector<Request::UP> list;
- vespalib::Thread &thread = vespalib::Thread::currentThread();
while (_queue.extract(_timer.sample(), list, sleepTime)) {
for (size_t i = 0; i < list.size(); ++i) {
Request::UP request = Request::UP(list[i].release());
_dispatcher.handle(std::move(request));
}
list.clear();
- thread.slumber(sleepTime);
+ {
+ auto guard = std::unique_lock(_lock);
+ if (_may_slumber) {
+ _cond.wait_for(guard, std::chrono::duration<double,std::milli>(sleepTime));
+ }
+ }
}
}
@@ -30,9 +35,12 @@ RequestScheduler::RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &ne
_queue(10.0, 0.020),
_droppedTagger(_proxy),
_dispatcher(_droppedTagger),
- _thread(*this, vbench_request_scheduler_thread),
+ _thread(),
_connectionPool(std::move(crypto), _timer),
- _workers()
+ _workers(),
+ _lock(),
+ _cond(),
+ _may_slumber(true)
{
for (size_t i = 0; i < numWorkers; ++i) {
_workers.push_back(std::make_unique<Worker>(_dispatcher, _proxy, _connectionPool, _timer));
@@ -45,7 +53,11 @@ RequestScheduler::abort()
{
_queue.close();
_queue.discard();
- _thread.stop();
+ {
+ auto guard = std::lock_guard(_lock);
+ _may_slumber = false;
+ _cond.notify_all();
+ }
}
void
@@ -59,7 +71,7 @@ void
RequestScheduler::start()
{
_timer.reset();
- _thread.start();
+ _thread = vespalib::Thread::start(*this, vbench_request_scheduler_thread);
}
RequestScheduler &
diff --git a/vbench/src/vbench/vbench/request_scheduler.h b/vbench/src/vbench/vbench/request_scheduler.h
index a0fb1eda4e7..b1d525eb691 100644
--- a/vbench/src/vbench/vbench/request_scheduler.h
+++ b/vbench/src/vbench/vbench/request_scheduler.h
@@ -7,7 +7,8 @@
#include <vbench/core/time_queue.h>
#include <vbench/core/dispatcher.h>
#include <vbench/core/handler_thread.h>
-#include <vespa/vespalib/util/active.h>
+#include <mutex>
+#include <condition_variable>
namespace vbench {
@@ -17,8 +18,7 @@ namespace vbench {
* with.
**/
class RequestScheduler : public Handler<Request>,
- public vespalib::Runnable,
- public vespalib::Active
+ public vespalib::Runnable
{
private:
Timer _timer;
@@ -29,7 +29,10 @@ private:
vespalib::Thread _thread;
HttpConnectionPool _connectionPool;
std::vector<Worker::UP> _workers;
-
+ std::mutex _lock;
+ std::condition_variable _cond;
+ bool _may_slumber;
+
void run() override;
public:
using UP = std::unique_ptr<RequestScheduler>;
@@ -37,9 +40,9 @@ public:
RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &next, size_t numWorkers);
void abort();
void handle(Request::UP request) override;
- void start() override;
- RequestScheduler &stop() override;
- void join() override;
+ void start();
+ RequestScheduler &stop();
+ void join();
};
} // namespace vbench
diff --git a/vbench/src/vbench/vbench/vbench.cpp b/vbench/src/vbench/vbench/vbench.cpp
index 9a5adad262e..8a8bceccefb 100644
--- a/vbench/src/vbench/vbench/vbench.cpp
+++ b/vbench/src/vbench/vbench/vbench.cpp
@@ -78,7 +78,6 @@ VBench::VBench(const vespalib::Slime &cfg)
}
inputChain->generator = _factory.createGenerator(generator, *inputChain->taggers.back());
if (inputChain->generator.get() != 0) {
- inputChain->thread.reset(new vespalib::Thread(*inputChain->generator, vbench_inputchain_generator));
_inputs.push_back(std::move(inputChain));
}
}
@@ -101,10 +100,10 @@ VBench::run()
{
_scheduler->start();
for (size_t i = 0; i < _inputs.size(); ++i) {
- _inputs[i]->thread->start();
+ _inputs[i]->thread = vespalib::Thread::start(*_inputs[i]->generator, vbench_inputchain_generator);
}
for (size_t i = 0; i < _inputs.size(); ++i) {
- _inputs[i]->thread->join();
+ _inputs[i]->thread.join();
}
_scheduler->stop().join();
for (size_t i = 0; i < _inputs.size(); ++i) {
diff --git a/vbench/src/vbench/vbench/vbench.h b/vbench/src/vbench/vbench/vbench.h
index dbb46e72800..2b7fcf0cd88 100644
--- a/vbench/src/vbench/vbench/vbench.h
+++ b/vbench/src/vbench/vbench/vbench.h
@@ -26,7 +26,7 @@ private:
using UP = std::unique_ptr<InputChain>;
std::vector<Tagger::UP> taggers;
Generator::UP generator;
- std::unique_ptr<vespalib::Thread> thread;
+ vespalib::Thread thread;
};
NativeFactory _factory;
std::vector<Analyzer::UP> _analyzers;
diff --git a/vbench/src/vbench/vbench/worker.cpp b/vbench/src/vbench/vbench/worker.cpp
index afccc7de39f..eabd17ae73f 100644
--- a/vbench/src/vbench/vbench/worker.cpp
+++ b/vbench/src/vbench/vbench/worker.cpp
@@ -24,13 +24,13 @@ Worker::run()
Worker::Worker(Provider<Request> &provider, Handler<Request> &next,
HttpConnectionPool &pool, Timer &timer)
- : _thread(*this, vbench_worker_thread),
+ : _thread(),
_provider(provider),
_next(next),
_pool(pool),
_timer(timer)
{
- _thread.start();
+ _thread = vespalib::Thread::start(*this, vbench_worker_thread);
}
} // namespace vbench
diff --git a/vbench/src/vbench/vbench/worker.h b/vbench/src/vbench/vbench/worker.h
index 6594a4f3dd6..d2bcfae637b 100644
--- a/vbench/src/vbench/vbench/worker.h
+++ b/vbench/src/vbench/vbench/worker.h
@@ -8,7 +8,6 @@
#include <vbench/http/http_connection_pool.h>
#include <vespa/vespalib/util/runnable.h>
#include <vespa/vespalib/util/thread.h>
-#include <vespa/vespalib/util/joinable.h>
namespace vbench {
@@ -18,8 +17,7 @@ namespace vbench {
* internal thread that will stop when the request provider starts
* handing out empty requests.
**/
-class Worker : public vespalib::Runnable,
- public vespalib::Joinable
+class Worker : public vespalib::Runnable
{
private:
vespalib::Thread _thread;
@@ -33,7 +31,7 @@ public:
using UP = std::unique_ptr<Worker>;
Worker(Provider<Request> &provider, Handler<Request> &next,
HttpConnectionPool &pool, Timer &timer);
- void join() override { _thread.join(); }
+ void join() { _thread.join(); }
};
} // namespace vbench