diff options
author | Geir Storli <geirst@yahooinc.com> | 2023-02-24 17:15:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-24 17:15:11 +0100 |
commit | 5b1995344966b854ddbdf3810c3f5614c30bea2f (patch) | |
tree | cd5444d6c993e940ab0a274cfce31dfc80793714 | |
parent | f82bc56396e7251aca9c46a067d189afddb077cc (diff) | |
parent | a907c35031c5a09fb2fc76080f273ff95e663e65 (diff) |
Merge pull request #26175 from vespa-engine/havardpe/avoid-fastos-thread-in-storage
avoid using fastos thread in storage
9 files changed, 21 insertions, 24 deletions
diff --git a/storage/src/tests/common/dummystoragelink.h b/storage/src/tests/common/dummystoragelink.h index e8ccc38df76..8da92917c08 100644 --- a/storage/src/tests/common/dummystoragelink.h +++ b/storage/src/tests/common/dummystoragelink.h @@ -11,8 +11,6 @@ #include <vespa/storage/common/bucketmessages.h> #include <vespa/storageapi/message/internal.h> -class FastOS_ThreadPool; - namespace storage { class DummyStorageLink : public StorageLink { diff --git a/storage/src/tests/storageserver/statereportertest.cpp b/storage/src/tests/storageserver/statereportertest.cpp index 4788ed0230c..e9b2f04eb04 100644 --- a/storage/src/tests/storageserver/statereportertest.cpp +++ b/storage/src/tests/storageserver/statereportertest.cpp @@ -31,7 +31,6 @@ public: }; struct StateReporterTest : Test { - FastOS_ThreadPool _threadPool; framework::defaultimplementation::FakeClock* _clock; std::unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<DummyStorageLink> _top; @@ -62,8 +61,7 @@ struct MetricClock : public metrics::MetricManager::Timer } StateReporterTest::StateReporterTest() - : _threadPool(), - _clock(nullptr), + : _clock(nullptr), _top(), _stateReporter() { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp index 26ca8963783..ceadd20baca 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp @@ -8,8 +8,7 @@ namespace storage::distributor { DistributorStripePool::DistributorStripePool(bool test_mode, PrivateCtorTag) - : _thread_pool(std::make_unique<FastOS_ThreadPool>()), - _n_stripe_bits(0), + : _n_stripe_bits(0), _stripes(), _threads(), _mutex(), @@ -119,7 +118,7 @@ void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) { } std::unique_lock lock(_mutex); // Ensure _threads is visible to all started threads for (auto& s : _stripes) { - _threads.emplace_back(_thread_pool->NewThread(s.get())); + _threads.start([ptr = s.get()](){ ptr->run(); }); } } @@ -131,9 +130,7 @@ void DistributorStripePool::stop_and_join() { for (auto& s : _stripes) { s->signal_should_stop(); } - for (auto* t : _threads) { - t->Join(); - } + _threads.join(); } void DistributorStripePool::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 00f5f57edf9..6ac95c27b76 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -2,14 +2,12 @@ #pragma once #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/thread.h> #include <atomic> #include <condition_variable> #include <mutex> #include <vector> -class FastOS_ThreadInterface; -class FastOS_ThreadPool; - namespace storage::distributor { class DistributorStripeThread; @@ -37,12 +35,10 @@ class TickableStripe; */ class DistributorStripePool { using StripeVector = std::vector<std::unique_ptr<DistributorStripeThread>>; - using NativeThreadVector = std::vector<FastOS_ThreadInterface*>; - std::unique_ptr<FastOS_ThreadPool> _thread_pool; uint8_t _n_stripe_bits; StripeVector _stripes; - NativeThreadVector _threads; + vespalib::ThreadPool _threads; std::mutex _mutex; std::condition_variable _parker_cond; size_t _parked_threads; // Must be protected by _park_mutex diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp index 8f37dbbbf5d..72854d9af75 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp @@ -23,7 +23,7 @@ DistributorStripeThread::DistributorStripeThread(TickableStripe& stripe, DistributorStripeThread::~DistributorStripeThread() = default; -void DistributorStripeThread::Run(FastOS_ThreadInterface*, void*) { +void DistributorStripeThread::run() { uint32_t tick_waits_inhibited = 0; while (!should_stop_thread_relaxed()) { while (should_park_relaxed()) { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h index 7015d27a53e..8b9453ab3f3 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/time.h> #include <atomic> #include <condition_variable> @@ -21,7 +20,7 @@ class TickableStripe; * A DistributorStripeThread instance is bidirectionally bound to a particular pool and * should therefore always be created by the pool itself (never standalone). */ -class DistributorStripeThread : private FastOS_Runnable { +class DistributorStripeThread { using AtomicDuration = std::atomic<vespalib::duration>; TickableStripe& _stripe; @@ -41,7 +40,7 @@ public: DistributorStripePool& stripe_pool); ~DistributorStripeThread(); - void Run(FastOS_ThreadInterface*, void*) override; + void run(); // Wakes up stripe thread if it's currently waiting for an external event to be triggered, // such as the arrival of a new RPC message. If thread is parked this call will have no diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index 907918d97dc..3f015d91a4a 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "rpc_target.h" #include "shared_rpc_resources.h" -#include <vespa/fastos/thread.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/frt/target.h> #include <vespa/fnet/transport.h> diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp index cde8a3596bf..077b85ea1ac 100644 --- a/vespalib/src/tests/thread/thread_test.cpp +++ b/vespalib/src/tests/thread/thread_test.cpp @@ -50,9 +50,17 @@ TEST("use thread pool to run multiple things") { bool init_called = false; bool was_run = false; ThreadPool pool; + EXPECT_TRUE(pool.empty()); + EXPECT_EQUAL(pool.size(), 0u); pool.start(my_fun, &was_run); + EXPECT_TRUE(!pool.empty()); + EXPECT_EQUAL(pool.size(), 1u); pool.start(agent, wrap(test_agent_thread, &init_called)); + EXPECT_TRUE(!pool.empty()); + EXPECT_EQUAL(pool.size(), 2u); pool.join(); + EXPECT_TRUE(pool.empty()); + EXPECT_EQUAL(pool.size(), 0u); EXPECT_TRUE(init_called); EXPECT_TRUE(agent.was_run); EXPECT_TRUE(was_run); diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h index 2a5693d2d26..9f3ebd89165 100644 --- a/vespalib/src/vespa/vespalib/util/thread.h +++ b/vespalib/src/vespa/vespalib/util/thread.h @@ -24,16 +24,18 @@ private: public: ThreadPool() noexcept : _threads() {} void start(Runnable &runnable, Runnable::init_fun_t init_fun) { - _threads.reserve(_threads.size() + 1); + reserve(size() + 1); _threads.push_back(thread::start(runnable, std::move(init_fun))); } template<typename F, typename... Args> requires std::invocable<F,Args...> void start(F &&f, Args && ... args) { - _threads.reserve(_threads.size() + 1); + reserve(size() + 1); _threads.emplace_back(std::forward<F>(f), std::forward<Args>(args)...); }; + void reserve(size_t capacity) { _threads.reserve(capacity); } size_t size() const { return _threads.size(); } + bool empty() const { return _threads.empty(); } void join() { for (auto &thread: _threads) { thread.join(); |