diff options
39 files changed, 73 insertions, 179 deletions
diff --git a/config/src/tests/file_acquirer/file_acquirer_test.cpp b/config/src/tests/file_acquirer/file_acquirer_test.cpp index 8449a33a782..8e11f964f16 100644 --- a/config/src/tests/file_acquirer/file_acquirer_test.cpp +++ b/config/src/tests/file_acquirer/file_acquirer_test.cpp @@ -11,7 +11,6 @@ using namespace config; struct ServerFixture : FRT_Invokable { fnet::frt::StandaloneFRT server; - FastOS_ThreadPool threadPool; FNET_Transport transport; FRT_Supervisor &orb; vespalib::string spec; @@ -26,14 +25,13 @@ struct ServerFixture : FRT_Invokable { ServerFixture() : server(), - threadPool(), transport(), orb(server.supervisor()) { init_rpc(); orb.Listen(0); spec = vespalib::make_string("tcp/localhost:%d", orb.GetListenPort()); - transport.Start(&threadPool); + transport.Start(); } void RPC_waitFor(FRT_RPCRequest *req) { diff --git a/config/src/tests/frtconnectionpool/frtconnectionpool.cpp b/config/src/tests/frtconnectionpool/frtconnectionpool.cpp index 9c3498a92a1..834b1797ed0 100644 --- a/config/src/tests/frtconnectionpool/frtconnectionpool.cpp +++ b/config/src/tests/frtconnectionpool/frtconnectionpool.cpp @@ -4,7 +4,6 @@ #include <vespa/config/frt/frtconnectionpool.h> #include <vespa/fnet/frt/error.h> #include <vespa/fnet/transport.h> -#include <vespa/fastos/thread.h> #include <sstream> #include <set> #include <unistd.h> @@ -14,7 +13,6 @@ using namespace config; class Test : public vespalib::TestApp { private: static ServerSpec::HostSpecList _sources; - FastOS_ThreadPool _threadPool; FNET_Transport _transport; void verifyAllSourcesInRotation(FRTConnectionPool& sourcePool); public: @@ -33,10 +31,9 @@ public: Test::Test() : vespalib::TestApp(), - _threadPool(), _transport() { - _transport.Start(&_threadPool); + _transport.Start(); } Test::~Test() { diff --git a/config/src/vespa/config/frt/frtconnectionpool.cpp b/config/src/vespa/config/frt/frtconnectionpool.cpp index 21d6f0dbe90..de00c650628 100644 --- a/config/src/vespa/config/frt/frtconnectionpool.cpp +++ b/config/src/vespa/config/frt/frtconnectionpool.cpp @@ -167,14 +167,12 @@ FRTConnectionPool::getScheduler() { return _supervisor->GetScheduler(); } -FRTConnectionPoolWithTransport::FRTConnectionPoolWithTransport(std::unique_ptr<FastOS_ThreadPool> threadPool, - std::unique_ptr<FNET_Transport> transport, +FRTConnectionPoolWithTransport::FRTConnectionPoolWithTransport(std::unique_ptr<FNET_Transport> transport, const ServerSpec & spec, const TimingValues & timingValues) - : _threadPool(std::move(threadPool)), - _transport(std::move(transport)), - _connectionPool(std::make_unique<FRTConnectionPool>(*_transport, spec, timingValues)) + : _transport(std::move(transport)), + _connectionPool(std::make_unique<FRTConnectionPool>(*_transport, spec, timingValues)) { - _transport->Start(_threadPool.get()); + _transport->Start(); } FRTConnectionPoolWithTransport::~FRTConnectionPoolWithTransport() diff --git a/config/src/vespa/config/frt/frtconnectionpool.h b/config/src/vespa/config/frt/frtconnectionpool.h index 564c6506159..5d97f2ae338 100644 --- a/config/src/vespa/config/frt/frtconnectionpool.h +++ b/config/src/vespa/config/frt/frtconnectionpool.h @@ -8,7 +8,6 @@ #include <map> class FNET_Transport; -class FastOS_ThreadPool; namespace config { @@ -103,8 +102,7 @@ public: class FRTConnectionPoolWithTransport : public ConnectionFactory { public: - FRTConnectionPoolWithTransport(std::unique_ptr<FastOS_ThreadPool> threadPool, - std::unique_ptr<FNET_Transport> transport, + FRTConnectionPoolWithTransport(std::unique_ptr<FNET_Transport> transport, const ServerSpec & spec, const TimingValues & timingValues); FRTConnectionPoolWithTransport(const FRTConnectionPoolWithTransport&) = delete; FRTConnectionPoolWithTransport& operator=(const FRTConnectionPoolWithTransport&) = delete; @@ -113,7 +111,6 @@ public: void syncTransport() override { _connectionPool->syncTransport(); } Connection* getCurrent() override { return _connectionPool->getCurrent(); } private: - std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRTConnectionPool> _connectionPool; }; diff --git a/config/src/vespa/config/subscription/sourcespec.cpp b/config/src/vespa/config/subscription/sourcespec.cpp index c05f639f9ba..0bdcf2acbc5 100644 --- a/config/src/vespa/config/subscription/sourcespec.cpp +++ b/config/src/vespa/config/subscription/sourcespec.cpp @@ -127,8 +127,7 @@ ServerSpec::createSourceFactory(const TimingValues & timingValues) const { const auto vespaVersion = VespaVersion::getCurrentVersion(); return std::make_unique<FRTSourceFactory>( - std::make_unique<FRTConnectionPoolWithTransport>(std::make_unique<FastOS_ThreadPool>(), - std::make_unique<FNET_Transport>(), + std::make_unique<FRTConnectionPoolWithTransport>(std::make_unique<FNET_Transport>(), *this, timingValues), timingValues, _traceLevel, vespaVersion, _compressionType); } diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/mirror_with_all.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/mirror_with_all.cpp index 0841c2ed32b..6e9a00dcdb6 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/mirror_with_all.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/mirror_with_all.cpp @@ -10,12 +10,11 @@ namespace documentapi { MirrorAndStuff::MirrorAndStuff(const slobrok::ConfiguratorFactory & config) - : _threadPool(std::make_unique<FastOS_ThreadPool>()), - _transport(std::make_unique<FNET_Transport>()), - _orb(std::make_unique<FRT_Supervisor>(_transport.get())), - _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, config)) + : _transport(std::make_unique<FNET_Transport>()), + _orb(std::make_unique<FRT_Supervisor>(_transport.get())), + _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, config)) { - _transport->Start(_threadPool.get()); + _transport->Start(); } MirrorAndStuff::~MirrorAndStuff() { diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/mirror_with_all.h b/documentapi/src/vespa/documentapi/messagebus/policies/mirror_with_all.h index 05571c4420e..ed5dd459768 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/mirror_with_all.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/mirror_with_all.h @@ -4,7 +4,6 @@ #include <memory> -class FastOS_ThreadPool; class FNET_Transport; class FRT_Supervisor; namespace slobrok { class ConfiguratorFactory; } @@ -18,7 +17,6 @@ public: ~MirrorAndStuff(); slobrok::api::IMirrorAPI * mirror() { return _mirror.get(); } private: - std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _orb; std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt index eeef7f63876..6d8836817e0 100644 --- a/fnet/CMakeLists.txt +++ b/fnet/CMakeLists.txt @@ -1,7 +1,6 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_define_module( DEPENDS - fastos vespalog vespalib diff --git a/fnet/INSTALL b/fnet/INSTALL deleted file mode 100644 index 9bb6c6152a4..00000000000 --- a/fnet/INSTALL +++ /dev/null @@ -1,22 +0,0 @@ -********************************* -* Compiling and Installing fnet * -********************************* - -'fnet' uses 'FastOS'. -In the following instructions, let %FASTOS_DIR% denote the install -directory for FastOS. A reasonable install directory would be: - %FASTOS_DIR% = '/usr/fastsearch/fastos' - -Install FastOS: -- checkout the fastos CVS module -- go to fastos/src/fastos -- ./configure --install-dir %FASTOS_DIR% [<config parameters>] - (run ./configure --help for help) -- make install - -Install fnet: -- checkout the fnet CVS module -- go to fnet/src -- ./configure --fastos-dir %FASTOS_DIR% [<config parameters>] - (run ./configure --fastos-dir %FASTOS_DIR% --help for help) -- make install diff --git a/fnet/README b/fnet/README deleted file mode 100644 index dab36e5b120..00000000000 --- a/fnet/README +++ /dev/null @@ -1,26 +0,0 @@ -This cvs module contains the code for FNET and FRT -FNET currently stands for 'FuNET' -FRT currently stands for 'FNET Remote Tools' - -FNET is a multi-threaded object-oriented networking library based on -FastOS. FRT implements a proprietary RPC protocol on top of FNET. - -For how to compile and install, read the INSTALL file. -For release information, read the RELEASEINFO file. - -The maintainer of this module is [havardpe@yahoo-inc.com] - -Notes from the maintainer: - -This library is work in progress. This means that some APIs may change -in new versions. I will try to keep these changes to a minimum and -also document them in the RELEASEINFO file. - -No extra effort has been added to hide things from the users of this -library. This means that you can use components like the scheduler to -implement a scheduler thread that has nothing to do with -networking. It also means that you have to expect to handle more API -changes if you are using classes not intended for direct use. It also -means that you will have access to methods you were never meant to -invoke. If this becomes a problem, I will start making things private -and the classes more 'friendly'. diff --git a/fnet/src/examples/ping/pingclient.cpp b/fnet/src/examples/ping/pingclient.cpp index 9b32e40ac83..43296df7e57 100644 --- a/fnet/src/examples/ping/pingclient.cpp +++ b/fnet/src/examples/ping/pingclient.cpp @@ -6,7 +6,6 @@ #include <vespa/fnet/connection.h> #include <examples/ping/packets.h> #include <vespa/vespalib/util/signalhandler.h> -#include <vespa/fastos/thread.h> #include <vespa/log/log.h> LOG_SETUP("pingclient"); @@ -28,7 +27,6 @@ PingClient::main(int argc, char **argv) } FNET_PacketQueue queue; - FastOS_ThreadPool pool; PingPacketFactory factory; FNET_SimplePacketStreamer streamer(&factory); FNET_Transport transport; @@ -39,7 +37,7 @@ PingClient::main(int argc, char **argv) if (argc == 3) { timeout_ms = atof(argv[2]) * 1000; } - transport.Start(&pool); + transport.Start(); uint32_t channelCnt = 0; for (uint32_t i = 0; i < 10; i++) { @@ -90,7 +88,6 @@ PingClient::main(int argc, char **argv) if (conn != nullptr) conn->SubRef(); transport.ShutDown(true); - pool.Close(); return 0; } diff --git a/fnet/src/examples/timeout/timeout.cpp b/fnet/src/examples/timeout/timeout.cpp index 41de852d48c..e0830c7cde1 100644 --- a/fnet/src/examples/timeout/timeout.cpp +++ b/fnet/src/examples/timeout/timeout.cpp @@ -5,7 +5,6 @@ #include <vespa/fnet/packetqueue.h> #include <vespa/fnet/controlpacket.h> #include <vespa/vespalib/util/signalhandler.h> -#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/time.h> #include <thread> @@ -54,10 +53,9 @@ MyApp::main(int, char **) ms_double ms; clock::time_point t; FNET_PacketQueue queue; - FastOS_ThreadPool pool; FNET_Transport transport; Timeout timeout(transport.GetScheduler(), &queue); - transport.Start(&pool); + transport.Start(); // stable-state operation std::this_thread::sleep_for(100ms); @@ -90,7 +88,6 @@ MyApp::main(int, char **) fprintf(stderr, "time since timeout was scheduled: %f ms\n", ms.count()); transport.ShutDown(true); - pool.Close(); return 0; } diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 681c3b7676a..9d566cf37a7 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -88,26 +88,25 @@ struct BlockingCryptoEngine : public CryptoEngine { struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { FNET_SimplePacketStreamer streamer; - FastOS_ThreadPool pool; FNET_Transport transport; Gate conn_lost; Gate conn_deleted; - TransportFixture() : streamer(nullptr), pool(), transport(), + TransportFixture() : streamer(nullptr), transport(), conn_lost(), conn_deleted() { - transport.Start(&pool); + transport.Start(); } TransportFixture(AsyncResolver::HostResolver::SP host_resolver) - : streamer(nullptr), pool(), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))), + : streamer(nullptr), transport(fnet::TransportConfig().resolver(make_resolver(std::move(host_resolver)))), conn_lost(), conn_deleted() { - transport.Start(&pool); + transport.Start(); } TransportFixture(CryptoEngine::SP crypto) - : streamer(nullptr), pool(), transport(fnet::TransportConfig().crypto(std::move(crypto))), + : streamer(nullptr), transport(fnet::TransportConfig().crypto(std::move(crypto))), conn_lost(), conn_deleted() { - transport.Start(&pool); + transport.Start(); } HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context) override { ASSERT_TRUE(packet->GetCommand() == FNET_ControlPacket::FNET_CMD_CHANNEL_LOST); @@ -127,7 +126,6 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { } ~TransportFixture() override { transport.ShutDown(true); - pool.Close(); } }; diff --git a/fnet/src/tests/connection_spread/connection_spread_test.cpp b/fnet/src/tests/connection_spread/connection_spread_test.cpp index d65e4fb70fe..6286ce65657 100644 --- a/fnet/src/tests/connection_spread/connection_spread_test.cpp +++ b/fnet/src/tests/connection_spread/connection_spread_test.cpp @@ -6,7 +6,6 @@ #include <vespa/fnet/ipacketstreamer.h> #include <vespa/fnet/connector.h> #include <vespa/fnet/connection.h> -#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> #include <thread> @@ -28,13 +27,12 @@ struct DummyStreamer : FNET_IPacketStreamer { struct Fixture { DummyStreamer streamer; DummyAdapter adapter; - FastOS_ThreadPool thread_pool; FNET_Transport client; FNET_Transport server; - Fixture() : streamer(), adapter(), thread_pool(), client(8), server(8) + Fixture() : streamer(), adapter(), client(8), server(8) { - ASSERT_TRUE(client.Start(&thread_pool)); - ASSERT_TRUE(server.Start(&thread_pool)); + ASSERT_TRUE(client.Start()); + ASSERT_TRUE(server.Start()); } void wait_for_components(size_t client_cnt, size_t server_cnt) { bool ok = false; @@ -49,7 +47,6 @@ struct Fixture { ~Fixture() { server.ShutDown(true); client.ShutDown(true); - thread_pool.Close(); } }; diff --git a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp index 716c433ff61..6325c60413a 100644 --- a/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp +++ b/fnet/src/tests/frt/detach_supervisor/detach_supervisor_test.cpp @@ -10,7 +10,6 @@ #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/time.h> -#include <vespa/fastos/thread.h> #include <thread> using namespace vespalib; @@ -19,14 +18,12 @@ using vespalib::make_string_short::fmt; CryptoEngine::SP null_crypto = std::make_shared<NullCryptoEngine>(); struct BasicFixture { - FastOS_ThreadPool thread_pool; FNET_Transport transport; - BasicFixture() : thread_pool(), transport(fnet::TransportConfig(4).crypto(null_crypto)) { - ASSERT_TRUE(transport.Start(&thread_pool)); + BasicFixture() : transport(fnet::TransportConfig(4).crypto(null_crypto)) { + ASSERT_TRUE(transport.Start()); } ~BasicFixture() { transport.ShutDown(true); - thread_pool.Close(); } }; diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 624f5a73ae6..41bfb7d06a6 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -4,7 +4,6 @@ #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fnet/frt/target.h> #include <vespa/fnet/transport.h> -#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/benchmark_timer.h> #include <vespa/vespalib/net/crypto_engine.h> #include <vespa/vespalib/net/tls/tls_crypto_engine.h> @@ -15,13 +14,12 @@ using namespace vespalib; struct Rpc : FRT_Invokable { - FastOS_ThreadPool thread_pool; FNET_Transport transport; FRT_Supervisor orb; Rpc(CryptoEngine::SP crypto, size_t num_threads, bool drop_empty) - : thread_pool(), transport(fnet::TransportConfig(num_threads).crypto(std::move(crypto)).drop_empty_buffers(drop_empty)), orb(&transport) {} + : transport(fnet::TransportConfig(num_threads).crypto(std::move(crypto)).drop_empty_buffers(drop_empty)), orb(&transport) {} void start() { - ASSERT_TRUE(transport.Start(&thread_pool)); + ASSERT_TRUE(transport.Start()); } uint32_t listen() { ASSERT_TRUE(orb.Listen(0)); @@ -32,7 +30,6 @@ struct Rpc : FRT_Invokable { } ~Rpc() override { transport.ShutDown(true); - thread_pool.Close(); } }; diff --git a/fnet/src/tests/sync_execute/sync_execute.cpp b/fnet/src/tests/sync_execute/sync_execute.cpp index b8fa21cf147..5d2f4097ab4 100644 --- a/fnet/src/tests/sync_execute/sync_execute.cpp +++ b/fnet/src/tests/sync_execute/sync_execute.cpp @@ -4,7 +4,6 @@ #include <vespa/vespalib/util/size_literals.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/iexecutable.h> -#include <vespa/fastos/thread.h> struct DoIt : public FNET_IExecutable { vespalib::Gate gate; @@ -18,10 +17,9 @@ TEST("sync execute") { DoIt exe2; DoIt exe3; DoIt exe4; - FastOS_ThreadPool pool; FNET_Transport transport; ASSERT_TRUE(transport.execute(&exe1)); - ASSERT_TRUE(transport.Start(&pool)); + ASSERT_TRUE(transport.Start()); exe1.gate.await(); ASSERT_TRUE(transport.execute(&exe2)); transport.sync(); @@ -32,7 +30,6 @@ TEST("sync execute") { transport.sync(); transport.WaitFinished(); transport.sync(); - pool.Close(); ASSERT_TRUE(exe1.gate.getCount() == 0u); ASSERT_TRUE(exe2.gate.getCount() == 0u); ASSERT_TRUE(exe3.gate.getCount() == 0u); diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index b08516c5009..966c606bf97 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -7,7 +7,6 @@ #include <vespa/fnet/transport.h> #include <vespa/fnet/transport_thread.h> #include <vespa/fnet/connector.h> -#include <vespa/fastos/thread.h> #include <vespa/vespalib/util/require.h> FNET_IPacketStreamer * @@ -291,11 +290,10 @@ FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_threa namespace fnet::frt { StandaloneFRT::StandaloneFRT(const TransportConfig &config) - : _threadPool(std::make_unique<FastOS_ThreadPool>()), - _transport(std::make_unique<FNET_Transport>(config)), + : _transport(std::make_unique<FNET_Transport>(config)), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())) { - REQUIRE(_transport->Start(_threadPool.get())); + REQUIRE(_transport->Start()); } StandaloneFRT::StandaloneFRT() diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h index 93272e93b4a..0261c7863b9 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.h +++ b/fnet/src/vespa/fnet/frt/supervisor.h @@ -13,7 +13,6 @@ namespace fnet { class TransportConfig; } class FNET_Transport; class FRT_Target; -class FastOS_ThreadPool; class FNET_Scheduler; class FRT_RPCInvoker; class FRT_IRequestWait; @@ -106,7 +105,6 @@ public: const FRT_Supervisor &supervisor() const { return *_supervisor; } void shutdown(); private: - std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _supervisor; }; diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index ae864ea0821..1553fc010c0 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -136,6 +136,7 @@ FNET_Transport::FNET_Transport(const fnet::TransportConfig &cfg) _time_tools(cfg.time_tools()), _work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, fnet_work_pool, 1024)), _threads(), + _pool(), _config(cfg.config()) { // TODO Temporary logging to track down overspend @@ -146,7 +147,10 @@ FNET_Transport::FNET_Transport(const fnet::TransportConfig &cfg) } } -FNET_Transport::~FNET_Transport() = default; +FNET_Transport::~FNET_Transport() +{ + _pool.join(); +} void FNET_Transport::post_or_perform(vespalib::Executor::Task::UP task) @@ -266,13 +270,12 @@ FNET_Transport::WaitFinished() } bool -FNET_Transport::Start(FastOS_ThreadPool *pool) +FNET_Transport::Start() { - bool result = true; for (const auto &thread: _threads) { - result &= thread->Start(pool); + thread->Start(_pool); } - return result; + return true; } void diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 3f9328296d6..d658059f0bb 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -7,9 +7,9 @@ #include <vespa/vespalib/net/async_resolver.h> #include <vespa/vespalib/net/crypto_engine.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/thread.h> class FNET_TransportThread; -class FastOS_ThreadPool; class FNET_Connector; class FNET_IPacketStreamer; class FNET_IServerAdapter; @@ -111,6 +111,7 @@ private: fnet::TimeTools::SP _time_tools; std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool; Threads _threads; + vespalib::ThreadPool _pool; const FNET_Config _config; /** @@ -317,9 +318,8 @@ public: * ok. * * @return thread create status. - * @param pool threadpool that may be used to spawn new threads. **/ - bool Start(FastOS_ThreadPool *pool); + bool Start(); /** * Capture transport threads. Used for testing purposes, diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 262cdaec190..970dc40150f 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -598,28 +598,28 @@ FNET_TransportThread::endEventLoop() { bool -FNET_TransportThread::Start(FastOS_ThreadPool *pool) +FNET_TransportThread::Start(vespalib::ThreadPool &pool) { - return (pool != nullptr && pool->NewThread(this)); + pool.start([this](){run();}); + return true; } void FNET_TransportThread::Main() { - Run(nullptr, nullptr); + run(); } void -FNET_TransportThread::Run(FastOS_ThreadInterface *thisThread, void *) +FNET_TransportThread::run() { if (!InitEventLoop()) { LOG(warning, "Transport: Run: Could not init event loop"); return; } while (EventLoopIteration()) { - if (thisThread != nullptr && thisThread->GetBreakFlag()) - ShutDown(false); + // event loop must be stopped from the outside } } diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 1911b11a81c..1744a3d60e5 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -6,9 +6,9 @@ #include "config.h" #include "task.h" #include "packetqueue.h" -#include <vespa/fastos/thread.h> #include <vespa/vespalib/net/socket_handle.h> #include <vespa/vespalib/net/selector.h> +#include <vespa/vespalib/util/thread.h> #include <atomic> #include <mutex> #include <condition_variable> @@ -26,7 +26,7 @@ class FNET_IServerAdapter; * the network related work for the application in both client and * server aspects. **/ -class FNET_TransportThread : public FastOS_Runnable +class FNET_TransportThread { friend class FNET_IOComponent; @@ -195,7 +195,7 @@ public: * Destruct object. This should NOT be done before the transport * thread has completed it's work and raised the finished flag. **/ - ~FNET_TransportThread() override; + ~FNET_TransportThread(); /** @@ -425,7 +425,7 @@ public: * @return thread create status. * @param pool threadpool that may be used to spawn a new thread. **/ - bool Start(FastOS_ThreadPool *pool); + bool Start(vespalib::ThreadPool &pool); /** @@ -440,5 +440,5 @@ public: * This is where the transport thread lives, when started by * invoking one of the @ref Main or @ref Start methods. **/ - void Run(FastOS_ThreadInterface *thisThread, void *args) override; + void run(); }; diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 030e3f956e1..3a13534220f 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -95,7 +95,6 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), - _threadPool(std::make_unique<FastOS_ThreadPool>()), _transport(std::make_unique<FNET_Transport>(toFNETConfig(params))), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), @@ -196,7 +195,7 @@ RPCNetwork::getSendAdapter(const vespalib::Version &version) bool RPCNetwork::start() { - if (!_transport->Start(_threadPool.get())) { + if (!_transport->Start()) { return false; } if (!_orb->Listen(_requestedPort)) { @@ -391,7 +390,6 @@ RPCNetwork::shutdown() // Unschedule any pending target pool flush task that may race with shutdown target flushing _scheduler.Kill(_targetPoolTask.get()); _transport->ShutDown(true); - _threadPool->Close(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 0d2435e5dcd..8e296981458 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -16,7 +16,6 @@ #include <vespa/fnet/frt/invokable.h> class FNET_Transport; -class FastOS_ThreadPool; namespace slobrok { namespace api { class RegisterAPI; } @@ -56,7 +55,6 @@ private: INetworkOwner *_owner; Identity _ident; - std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _orb; FNET_Scheduler &_scheduler; diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp index 2c47b5162ca..4d21143935b 100644 --- a/searchcore/src/apps/proton/proton.cpp +++ b/searchcore/src/apps/proton/proton.cpp @@ -206,10 +206,10 @@ buildTransportConfig() { class Transport { public: - Transport(const fnet::TransportConfig & config, FastOS_ThreadPool & threadPool) + Transport(const fnet::TransportConfig & config) : _transport(config) { - _transport.Start(&threadPool); + _transport.Start(); } ~Transport() { _transport.ShutDown(true); @@ -284,7 +284,7 @@ App::main(int argc, char **argv) setupSignals(); setup_fadvise(); FastOS_ThreadPool threadPool; - Transport transport(buildTransportConfig(), threadPool); + Transport transport(buildTransportConfig()); startAndRun(threadPool, transport.transport(), argc, argv); } catch (const vespalib::InvalidCommandLineArgumentsException &e) { LOG(warning, "Invalid commandline arguments: '%s'", e.what()); diff --git a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp index 76fc875b209..d69c978729d 100644 --- a/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp +++ b/searchcore/src/apps/vespa-transactionlog-inspect/vespa-transactionlog-inspect.cpp @@ -379,7 +379,6 @@ class BaseUtility : public Utility protected: const BaseOptions &_bopts; DummyFileHeaderContext _fileHeader; - FastOS_ThreadPool _threadPool; FNET_Transport _transport; TransLogServer _server; client::TransLogClient _client; @@ -388,12 +387,11 @@ public: BaseUtility(const BaseOptions &bopts) : _bopts(bopts), _fileHeader(), - _threadPool(), _transport(), _server(_transport, _bopts.tlsName, _bopts.listenPort, _bopts.tlsDir, _fileHeader), _client(_transport, vespalib::make_string("tcp/localhost:%d", _bopts.listenPort)) { - _transport.Start(&_threadPool); + _transport.Start(); } ~BaseUtility() override { _transport.ShutDown(true); diff --git a/searchcore/src/tests/proton/common/timer/timer_test.cpp b/searchcore/src/tests/proton/common/timer/timer_test.cpp index ac82767cd7c..e3df6b319a9 100644 --- a/searchcore/src/tests/proton/common/timer/timer_test.cpp +++ b/searchcore/src/tests/proton/common/timer/timer_test.cpp @@ -46,17 +46,15 @@ make_scheduled_executor<ScheduledForwardExecutor>(FNET_Transport& transport, ves template <typename ScheduledT> class ScheduledExecutorTest : public testing::Test { public: - FastOS_ThreadPool threadPool; FNET_Transport transport; vespalib::ThreadStackExecutor executor; std::unique_ptr<ScheduledT> timer; ScheduledExecutorTest() - : threadPool(), - transport(), + : transport(), executor(1) { - transport.Start(&threadPool); + transport.Start(); timer = make_scheduled_executor<ScheduledT>(transport, executor); } ~ScheduledExecutorTest() { diff --git a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp index afd224c55de..4b6ffd07b8a 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp +++ b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp @@ -305,7 +305,8 @@ TEST_FFF("require that proton config fetcher follows changes to bootstrap", ConfigTestFixture("search"), ProtonConfigOwner(), ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(f1.transport.threadPool()); + FastOS_ThreadPool pool; + f3.start(pool); ASSERT_TRUE(f2._configured); ASSERT_TRUE(f1.configEqual(f2.getBootstrapConfig())); f2._configured = false; @@ -320,7 +321,8 @@ TEST_FFF("require that proton config fetcher follows changes to doctypes", ConfigTestFixture("search"), ProtonConfigOwner(), ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(f1.transport.threadPool()); + FastOS_ThreadPool pool; + f3.start(pool); f2._configured = false; f1.addDocType("typea"); @@ -340,7 +342,8 @@ TEST_FFF("require that proton config fetcher reconfigures dbowners", ConfigTestFixture("search"), ProtonConfigOwner(), ProtonConfigFetcher(f1.transport.transport(), ConfigUri(f1.configId, f1.context), f2, 60s)) { - f3.start(f1.transport.threadPool()); + FastOS_ThreadPool pool; + f3.start(pool); ASSERT_FALSE(f2.getDocumentDBConfig("typea")); // Add db and verify that config for db is provided diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp index 587da244937..8ab71637684 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp @@ -136,7 +136,7 @@ RPCHooksBase::open(Params & params) initRPC(); _regAPI.registerName((params.identity + "/realtimecontroller").c_str()); _orb->Listen(params.rtcPort); - _transport->Start(&_proton.getThreadPool()); + _transport->Start(); LOG(debug, "started monitoring interface"); } diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp index d0bc4dfd4d8..454a7093087 100644 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp @@ -12,11 +12,10 @@ namespace proton { Transport::Transport() - : _threadPool(std::make_unique<FastOS_ThreadPool>()), - _transport(std::make_unique<FNET_Transport>()), + : _transport(std::make_unique<FNET_Transport>()), _clock(std::make_unique<vespalib::TestClock>()) { - _transport->Start(_threadPool.get()); + _transport->Start(); } Transport::~Transport() { diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h index 46ca8131041..8a724009fc1 100644 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h @@ -3,8 +3,6 @@ #include <vespa/searchcorespi/index/ithreadingservice.h> -class FastOS_ThreadPool; - namespace vespalib { class TestClock; } namespace proton { @@ -19,11 +17,9 @@ public: Transport(); virtual ~Transport(); FNET_Transport & transport() { return *_transport; } - FastOS_ThreadPool & threadPool() { return *_threadPool; } const vespalib::Clock & clock() const; virtual void shutdown(); private: - std::unique_ptr<FastOS_ThreadPool> _threadPool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<vespalib::TestClock> _clock; }; diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index a1a42b592b2..cdf157b3fa0 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -480,16 +480,14 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain) } struct TLS { - FastOS_ThreadPool threadPool; FNET_Transport transport; TransLogServer tls; TLS(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads = 4) - : threadPool(), - transport(), + : transport(), tls(transport, name, listenPort, baseDir, fileHeaderContext, cfg, maxThreads) { - transport.Start(&threadPool); + transport.Start(); } ~TLS() { transport.ShutDown(true); diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index f0ca11a2cc8..14a66c71c18 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -706,7 +706,6 @@ TransLogStress::main(int argc, char **argv) } // start transaction log server - FastOS_ThreadPool threadPool; FNET_Transport transport; DummyFileHeaderContext fileHeaderContext; TransLogServer tls(transport, "server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); @@ -759,8 +758,6 @@ TransLogStress::main(int argc, char **argv) std::cout << "</visitor>" << std::endl; } - threadPool.Close(); - return 0; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h index c3dcecf93b3..72a55f6ae77 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogclient.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogclient.h @@ -12,7 +12,6 @@ class FNET_Transport; class FRT_Supervisor; class FRT_Target; -class FastOS_ThreadPool; namespace vespalib { class ThreadStackExecutorBase; } namespace search::transactionlog::client { diff --git a/slobrok/src/tests/mirrorapi/mirrorapi.cpp b/slobrok/src/tests/mirrorapi/mirrorapi.cpp index 2ba54fdb9d0..568980152bd 100644 --- a/slobrok/src/tests/mirrorapi/mirrorapi.cpp +++ b/slobrok/src/tests/mirrorapi/mirrorapi.cpp @@ -139,12 +139,11 @@ Test::Main() cloud::config::SlobroksConfig::Slobrok slobrok; slobrok.connectionspec = "tcp/localhost:18501"; specBuilder.slobrok.push_back(slobrok); - FastOS_ThreadPool threadPool; FNET_Transport transport; FRT_Supervisor supervisor(&transport); MirrorAPI mirror(supervisor, slobrok::ConfiguratorFactory(config::ConfigUri::createFromInstance(specBuilder))); EXPECT_TRUE(!mirror.ready()); - transport.Start(&threadPool); + transport.Start(); std::this_thread::sleep_for(1s); a.reg(); diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index e940dc71722..907918d97dc 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -66,8 +66,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri, int rpc_server_port, size_t rpc_thread_pool_size, size_t rpc_events_before_wakeup) - : _thread_pool(std::make_unique<FastOS_ThreadPool>()), - _transport(std::make_unique<FNET_Transport>(fnet::TransportConfig(rpc_thread_pool_size). + : _transport(std::make_unique<FNET_Transport>(fnet::TransportConfig(rpc_thread_pool_size). events_before_wakeup(rpc_events_before_wakeup))), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _slobrok_register(std::make_unique<slobrok::api::RegisterAPI>(*_orb, slobrok::ConfiguratorFactory(config_uri))), @@ -92,7 +91,7 @@ void SharedRpcResources::start_server_and_register_slobrok(vespalib::stringref m if (!_orb->Listen(_rpc_server_port)) { throw IllegalStateException(fmt("Failed to listen to RPC port %d", _rpc_server_port), VESPA_STRLOC); } - _transport->Start(_thread_pool.get()); + _transport->Start(); _slobrok_register->registerName(my_handle); wait_until_slobrok_is_ready(); _handle = my_handle; diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h index a30fcdc4ea7..953492089c1 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -6,7 +6,6 @@ #include <vespa/vespalib/stllike/string.h> #include <memory> -class FastOS_ThreadPool; class FNET_Transport; class FRT_Supervisor; @@ -19,7 +18,6 @@ namespace storage::rpc { class SharedRpcResources { class RpcTargetFactoryImpl; - std::unique_ptr<FastOS_ThreadPool> _thread_pool; std::unique_ptr<FNET_Transport> _transport; std::unique_ptr<FRT_Supervisor> _orb; std::unique_ptr<slobrok::api::RegisterAPI> _slobrok_register; diff --git a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h index 07b2dd78ed9..4319b4a0efe 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h +++ b/storage/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h @@ -4,8 +4,6 @@ #include <vespa/storageframework/generic/thread/threadpool.h> -class FastOS_ThreadPool; - namespace storage::framework { struct Clock; } |