summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@yahooinc.com>2023-02-24 17:15:11 +0100
committerGitHub <noreply@github.com>2023-02-24 17:15:11 +0100
commit5b1995344966b854ddbdf3810c3f5614c30bea2f (patch)
treecd5444d6c993e940ab0a274cfce31dfc80793714
parentf82bc56396e7251aca9c46a067d189afddb077cc (diff)
parenta907c35031c5a09fb2fc76080f273ff95e663e65 (diff)
Merge pull request #26175 from vespa-engine/havardpe/avoid-fastos-thread-in-storage
avoid using fastos thread in storage
-rw-r--r--storage/src/tests/common/dummystoragelink.h2
-rw-r--r--storage/src/tests/storageserver/statereportertest.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h8
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.h5
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp1
-rw-r--r--vespalib/src/tests/thread/thread_test.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.h6
9 files changed, 21 insertions, 24 deletions
diff --git a/storage/src/tests/common/dummystoragelink.h b/storage/src/tests/common/dummystoragelink.h
index e8ccc38df76..8da92917c08 100644
--- a/storage/src/tests/common/dummystoragelink.h
+++ b/storage/src/tests/common/dummystoragelink.h
@@ -11,8 +11,6 @@
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storageapi/message/internal.h>
-class FastOS_ThreadPool;
-
namespace storage {
class DummyStorageLink : public StorageLink {
diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp
index 4788ed0230c..e9b2f04eb04 100644
--- a/storage/src/tests/storageserver/statereportertest.cpp
+++ b/storage/src/tests/storageserver/statereportertest.cpp
@@ -31,7 +31,6 @@ public:
};
struct StateReporterTest : Test {
- FastOS_ThreadPool _threadPool;
framework::defaultimplementation::FakeClock* _clock;
std::unique_ptr<TestServiceLayerApp> _node;
std::unique_ptr<DummyStorageLink> _top;
@@ -62,8 +61,7 @@ struct MetricClock : public metrics::MetricManager::Timer
}
StateReporterTest::StateReporterTest()
- : _threadPool(),
- _clock(nullptr),
+ : _clock(nullptr),
_top(),
_stateReporter()
{
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
index 26ca8963783..ceadd20baca 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
@@ -8,8 +8,7 @@
namespace storage::distributor {
DistributorStripePool::DistributorStripePool(bool test_mode, PrivateCtorTag)
- : _thread_pool(std::make_unique<FastOS_ThreadPool>()),
- _n_stripe_bits(0),
+ : _n_stripe_bits(0),
_stripes(),
_threads(),
_mutex(),
@@ -119,7 +118,7 @@ void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) {
}
std::unique_lock lock(_mutex); // Ensure _threads is visible to all started threads
for (auto& s : _stripes) {
- _threads.emplace_back(_thread_pool->NewThread(s.get()));
+ _threads.start([ptr = s.get()](){ ptr->run(); });
}
}
@@ -131,9 +130,7 @@ void DistributorStripePool::stop_and_join() {
for (auto& s : _stripes) {
s->signal_should_stop();
}
- for (auto* t : _threads) {
- t->Join();
- }
+ _threads.join();
}
void DistributorStripePool::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept {
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 00f5f57edf9..6ac95c27b76 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -2,14 +2,12 @@
#pragma once
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/thread.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
-class FastOS_ThreadInterface;
-class FastOS_ThreadPool;
-
namespace storage::distributor {
class DistributorStripeThread;
@@ -37,12 +35,10 @@ class TickableStripe;
*/
class DistributorStripePool {
using StripeVector = std::vector<std::unique_ptr<DistributorStripeThread>>;
- using NativeThreadVector = std::vector<FastOS_ThreadInterface*>;
- std::unique_ptr<FastOS_ThreadPool> _thread_pool;
uint8_t _n_stripe_bits;
StripeVector _stripes;
- NativeThreadVector _threads;
+ vespalib::ThreadPool _threads;
std::mutex _mutex;
std::condition_variable _parker_cond;
size_t _parked_threads; // Must be protected by _park_mutex
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
index 8f37dbbbf5d..72854d9af75 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
@@ -23,7 +23,7 @@ DistributorStripeThread::DistributorStripeThread(TickableStripe& stripe,
DistributorStripeThread::~DistributorStripeThread() = default;
-void DistributorStripeThread::Run(FastOS_ThreadInterface*, void*) {
+void DistributorStripeThread::run() {
uint32_t tick_waits_inhibited = 0;
while (!should_stop_thread_relaxed()) {
while (should_park_relaxed()) {
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
index 7015d27a53e..8b9453ab3f3 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/fastos/thread.h>
#include <vespa/vespalib/util/time.h>
#include <atomic>
#include <condition_variable>
@@ -21,7 +20,7 @@ class TickableStripe;
* A DistributorStripeThread instance is bidirectionally bound to a particular pool and
* should therefore always be created by the pool itself (never standalone).
*/
-class DistributorStripeThread : private FastOS_Runnable {
+class DistributorStripeThread {
using AtomicDuration = std::atomic<vespalib::duration>;
TickableStripe& _stripe;
@@ -41,7 +40,7 @@ public:
DistributorStripePool& stripe_pool);
~DistributorStripeThread();
- void Run(FastOS_ThreadInterface*, void*) override;
+ void run();
// Wakes up stripe thread if it's currently waiting for an external event to be triggered,
// such as the arrival of a new RPC message. If thread is parked this call will have no
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
index 907918d97dc..3f015d91a4a 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpc_target.h"
#include "shared_rpc_resources.h"
-#include <vespa/fastos/thread.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/fnet/transport.h>
diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp
index cde8a3596bf..077b85ea1ac 100644
--- a/vespalib/src/tests/thread/thread_test.cpp
+++ b/vespalib/src/tests/thread/thread_test.cpp
@@ -50,9 +50,17 @@ TEST("use thread pool to run multiple things") {
bool init_called = false;
bool was_run = false;
ThreadPool pool;
+ EXPECT_TRUE(pool.empty());
+ EXPECT_EQUAL(pool.size(), 0u);
pool.start(my_fun, &was_run);
+ EXPECT_TRUE(!pool.empty());
+ EXPECT_EQUAL(pool.size(), 1u);
pool.start(agent, wrap(test_agent_thread, &init_called));
+ EXPECT_TRUE(!pool.empty());
+ EXPECT_EQUAL(pool.size(), 2u);
pool.join();
+ EXPECT_TRUE(pool.empty());
+ EXPECT_EQUAL(pool.size(), 0u);
EXPECT_TRUE(init_called);
EXPECT_TRUE(agent.was_run);
EXPECT_TRUE(was_run);
diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h
index 2a5693d2d26..9f3ebd89165 100644
--- a/vespalib/src/vespa/vespalib/util/thread.h
+++ b/vespalib/src/vespa/vespalib/util/thread.h
@@ -24,16 +24,18 @@ private:
public:
ThreadPool() noexcept : _threads() {}
void start(Runnable &runnable, Runnable::init_fun_t init_fun) {
- _threads.reserve(_threads.size() + 1);
+ reserve(size() + 1);
_threads.push_back(thread::start(runnable, std::move(init_fun)));
}
template<typename F, typename... Args>
requires std::invocable<F,Args...>
void start(F &&f, Args && ... args) {
- _threads.reserve(_threads.size() + 1);
+ reserve(size() + 1);
_threads.emplace_back(std::forward<F>(f), std::forward<Args>(args)...);
};
+ void reserve(size_t capacity) { _threads.reserve(capacity); }
size_t size() const { return _threads.size(); }
+ bool empty() const { return _threads.empty(); }
void join() {
for (auto &thread: _threads) {
thread.join();