diff options
-rw-r--r-- | fnet/src/vespa/fnet/transport.cpp | 11 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 3 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 1 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 8 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 1 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.h | 41 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/async_resolver.cpp | 10 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/async_resolver.h | 12 |
8 files changed, 47 insertions, 40 deletions
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index d3b52969c8c..8543d648400 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -3,6 +3,7 @@ #include "transport.h" #include "transport_thread.h" #include "iocomponent.h" +#include <vespa/vespalib/util/threadstackexecutor.h> #include <chrono> #include <xxhash.h> @@ -27,12 +28,12 @@ VESPA_THREAD_STACK_TAG(fnet_work_pool); FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads) : _async_resolver(std::move(resolver)), _crypto_engine(std::move(crypto)), - _work_pool(1, 128 * 1024, fnet_work_pool, 1024), + _work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, fnet_work_pool, 1024)), _threads() { assert(num_threads >= 1); for (size_t i = 0; i < num_threads; ++i) { - _threads.emplace_back(new FNET_TransportThread(*this)); + _threads.emplace_back(std::make_unique<FNET_TransportThread>(*this)); } } @@ -41,7 +42,7 @@ FNET_Transport::~FNET_Transport() = default; void FNET_Transport::post_or_perform(vespalib::Executor::Task::UP task) { - if (auto rejected = _work_pool.execute(std::move(task))) { + if (auto rejected = _work_pool->execute(std::move(task))) { rejected->run(); } } @@ -161,7 +162,7 @@ FNET_Transport::ShutDown(bool waitFinished) } if (waitFinished) { _async_resolver->wait_for_pending_resolves(); - _work_pool.shutdown().sync(); + _work_pool->shutdown().sync(); } } @@ -172,7 +173,7 @@ FNET_Transport::WaitFinished() thread->WaitFinished(); } _async_resolver->wait_for_pending_resolves(); - _work_pool.shutdown().sync(); + _work_pool->shutdown().sync(); } bool diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 02ef22c7fb6..29f1fb2144f 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -7,7 +7,6 @@ #include <vector> #include <vespa/vespalib/net/async_resolver.h> #include <vespa/vespalib/net/crypto_engine.h> -#include <vespa/vespalib/util/threadstackexecutor.h> class FNET_TransportThread; class FastOS_ThreadPool; @@ -30,7 +29,7 @@ private: vespalib::AsyncResolver::SP _async_resolver; vespalib::CryptoEngine::SP _crypto_engine; - vespalib::ThreadStackExecutor _work_pool; + std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool; Threads _threads; public: diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index a20de880f15..ba9c4bb7789 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -9,6 +9,7 @@ #include "transport.h" #include <vespa/vespalib/net/socket_spec.h> #include <vespa/vespalib/net/server_socket.h> +#include <vespa/vespalib/util/gate.h> #include <csignal> #include <vespa/log/log.h> diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 8bced290962..966ffb849e0 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -54,10 +54,6 @@ private: bool _finished; // event loop stopped ? bool _waitFinished; // someone is waiting for _finished - FNET_TransportThread(const FNET_TransportThread &); - FNET_TransportThread &operator=(const FNET_TransportThread &); - - /** * Add an IOComponent to the list of components. This operation is * performed immidiately and without locking. This method should @@ -177,6 +173,8 @@ private: } public: + FNET_TransportThread(const FNET_TransportThread &) = delete; + FNET_TransportThread &operator=(const FNET_TransportThread &) = delete; /** * Construct a transport object. To activate your newly created * transport object you need to call either the Start method to @@ -192,7 +190,7 @@ public: * Destruct object. This should NOT be done before the transport * thread has completed it's work and raised the finished flag. **/ - ~FNET_TransportThread(); + ~FNET_TransportThread() override; /** diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 5872706c443..893b8d1d2ca 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -14,6 +14,7 @@ #include <vespa/vespalib/component/vtag.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 2780c3e8770..d701358fc84 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -15,6 +15,7 @@ #include <vespa/fnet/frt/invokable.h> class FNET_Transport; +class FastOS_ThreadPool; namespace slobrok { namespace api { class RegisterAPI; } @@ -52,26 +53,26 @@ private: using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; - INetworkOwner *_owner; - Identity _ident; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - FNET_Scheduler &_scheduler; - std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; - std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; - std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; - int _requestedPort; - std::unique_ptr<RPCTargetPool> _targetPool; - std::unique_ptr<FNET_Task> _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; - std::unique_ptr<vespalib::ThreadStackExecutor> _executor; - std::unique_ptr<RPCSendAdapter> _sendV1; - std::unique_ptr<RPCSendAdapter> _sendV2; - SendAdapterMap _sendAdapters; - CompressionConfig _compressionConfig; - bool _allowDispatchForEncode; - bool _allowDispatchForDecode; + INetworkOwner *_owner; + Identity _ident; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + FNET_Scheduler &_scheduler; + std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; + std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; + std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; + int _requestedPort; + std::unique_ptr<RPCTargetPool> _targetPool; + std::unique_ptr<FNET_Task> _targetPoolTask; + std::unique_ptr<RPCServicePool> _servicePool; + std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; + std::unique_ptr<RPCSendAdapter> _sendV1; + std::unique_ptr<RPCSendAdapter> _sendV2; + SendAdapterMap _sendAdapters; + CompressionConfig _compressionConfig; + bool _allowDispatchForEncode; + bool _allowDispatchForDecode; /** diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.cpp b/vespalib/src/vespa/vespalib/net/async_resolver.cpp index bc918434077..08e1774b1dd 100644 --- a/vespalib/src/vespa/vespalib/net/async_resolver.cpp +++ b/vespalib/src/vespa/vespalib/net/async_resolver.cpp @@ -2,6 +2,7 @@ #include "async_resolver.h" #include "socket_spec.h" +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/log/log.h> LOG_SETUP(".vespalib.net.async_resolver"); @@ -149,15 +150,20 @@ AsyncResolver::SP AsyncResolver::_shared_resolver(nullptr); AsyncResolver::AsyncResolver(HostResolver::SP resolver, size_t num_threads) : _resolver(std::move(resolver)), - _executor(num_threads, 128*1024, async_resolver_executor_thread) + _executor(std::make_unique<ThreadStackExecutor>(num_threads, 128*1024, async_resolver_executor_thread)) { } void +AsyncResolver::wait_for_pending_resolves() { + _executor->sync(); +} + +void AsyncResolver::resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler) { auto task = std::make_unique<ResolveTask>(spec, *_resolver, std::move(result_handler)); - auto rejected = _executor.execute(std::move(task)); + auto rejected = _executor->execute(std::move(task)); assert(!rejected); } diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.h b/vespalib/src/vespa/vespalib/net/async_resolver.h index 590e6672922..f9f2079004b 100644 --- a/vespalib/src/vespa/vespalib/net/async_resolver.h +++ b/vespalib/src/vespa/vespalib/net/async_resolver.h @@ -4,7 +4,7 @@ #include "socket_address.h" #include "socket_spec.h" -#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/arrayqueue.hpp> #include <chrono> #include <memory> @@ -117,15 +117,15 @@ private: void run() override; }; - HostResolver::SP _resolver; - ThreadStackExecutor _executor; - static std::mutex _shared_lock; - static AsyncResolver::SP _shared_resolver; + HostResolver::SP _resolver; + std::unique_ptr<SyncableThreadExecutor> _executor; + static std::mutex _shared_lock; + static AsyncResolver::SP _shared_resolver; AsyncResolver(HostResolver::SP resolver, size_t num_threads); public: void resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler); - void wait_for_pending_resolves() { _executor.sync(); } + void wait_for_pending_resolves(); static AsyncResolver::SP create(Params params); static AsyncResolver::SP get_shared(); }; |