diff options
-rw-r--r-- | fnet/src/vespa/fnet/transport.cpp | 14 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 9 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 15 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 14 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 1 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.h | 41 | ||||
-rw-r--r-- | storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/async_resolver.cpp | 10 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/net/async_resolver.h | 12 |
9 files changed, 68 insertions, 49 deletions
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index d3b52969c8c..e59186069ce 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -3,6 +3,7 @@ #include "transport.h" #include "transport_thread.h" #include "iocomponent.h" +#include <vespa/vespalib/util/threadstackexecutor.h> #include <chrono> #include <xxhash.h> @@ -27,12 +28,13 @@ VESPA_THREAD_STACK_TAG(fnet_work_pool); FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads) : _async_resolver(std::move(resolver)), _crypto_engine(std::move(crypto)), - _work_pool(1, 128 * 1024, fnet_work_pool, 1024), - _threads() + _work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, fnet_work_pool, 1024)), + _threads(), + _events_before_wakeup(1) { assert(num_threads >= 1); for (size_t i = 0; i < num_threads; ++i) { - _threads.emplace_back(new FNET_TransportThread(*this)); + _threads.emplace_back(std::make_unique<FNET_TransportThread>(*this)); } } @@ -41,7 +43,7 @@ FNET_Transport::~FNET_Transport() = default; void FNET_Transport::post_or_perform(vespalib::Executor::Task::UP task) { - if (auto rejected = _work_pool.execute(std::move(task))) { + if (auto rejected = _work_pool->execute(std::move(task))) { rejected->run(); } } @@ -161,7 +163,7 @@ FNET_Transport::ShutDown(bool waitFinished) } if (waitFinished) { _async_resolver->wait_for_pending_resolves(); - _work_pool.shutdown().sync(); + _work_pool->shutdown().sync(); } } @@ -172,7 +174,7 @@ FNET_Transport::WaitFinished() thread->WaitFinished(); } _async_resolver->wait_for_pending_resolves(); - _work_pool.shutdown().sync(); + _work_pool->shutdown().sync(); } bool diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 02ef22c7fb6..1126f68e69f 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -7,7 +7,6 @@ #include <vector> #include <vespa/vespalib/net/async_resolver.h> #include <vespa/vespalib/net/crypto_engine.h> -#include <vespa/vespalib/util/threadstackexecutor.h> class FNET_TransportThread; class FastOS_ThreadPool; @@ -30,8 +29,9 @@ private: vespalib::AsyncResolver::SP _async_resolver; vespalib::CryptoEngine::SP _crypto_engine; - vespalib::ThreadStackExecutor _work_pool; + std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool; Threads _threads; + size_t _events_before_wakeup; public: /** @@ -53,6 +53,11 @@ public: : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), 1) {} ~FNET_Transport(); + size_t events_before_wakeup() const { return _events_before_wakeup; } + void events_before_wakeup(size_t events_before_wakeup_in) { + _events_before_wakeup = events_before_wakeup_in; + } + /** * Try to execute the given task on the internal work pool * executor (post). If the executor has been closed or there is diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index a20de880f15..d61eaffa24f 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -9,6 +9,7 @@ #include "transport.h" #include <vespa/vespalib/net/socket_spec.h> #include <vespa/vespalib/net/server_socket.h> +#include <vespa/vespalib/util/gate.h> #include <csignal> #include <vespa/log/log.h> @@ -17,6 +18,7 @@ LOG_SETUP(".fnet"); using vespalib::ServerSocket; using vespalib::SocketHandle; using vespalib::SocketSpec; +using OptimizeFor = vespalib::Executor::OptimizeFor; namespace { @@ -112,7 +114,7 @@ bool FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, FNET_Context context) { - bool wasEmpty; + size_t qLen; { std::unique_lock<std::mutex> guard(_lock); if (IsShutDown()) { @@ -120,10 +122,10 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, SafeDiscardEvent(cpacket, context); return false; } - wasEmpty = _queue.IsEmpty_NoLock(); _queue.QueuePacket_NoLock(cpacket, context); + qLen = _queue.GetPacketCnt_NoLock(); } - if (wasEmpty) { + if (qLen == _owner.events_before_wakeup()) { _selector.wakeup(); } return true; @@ -396,9 +398,8 @@ FNET_TransportThread::InitEventLoop() return true; } - void -FNET_TransportThread::handle_wakeup() +FNET_TransportThread::handle_wakeup_events() { { std::lock_guard<std::mutex> guard(_lock); @@ -479,7 +480,9 @@ FNET_TransportThread::EventLoopIteration() // sample current time (performed once per event loop iteration) _now = clock::now(); - // handle wakeup and io-events + handle_wakeup_events(); + + // handle io-events _selector.dispatch(*this); // handle IOC time-outs diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 8bced290962..b4319d4e2bc 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -54,10 +54,6 @@ private: bool _finished; // event loop stopped ? bool _waitFinished; // someone is waiting for _finished - FNET_TransportThread(const FNET_TransportThread &); - FNET_TransportThread &operator=(const FNET_TransportThread &); - - /** * Add an IOComponent to the list of components. This operation is * performed immidiately and without locking. This method should @@ -176,7 +172,11 @@ private: return _shutdown.load(std::memory_order_relaxed); } + void handle_wakeup_events(); + public: + FNET_TransportThread(const FNET_TransportThread &) = delete; + FNET_TransportThread &operator=(const FNET_TransportThread &) = delete; /** * Construct a transport object. To activate your newly created * transport object you need to call either the Start method to @@ -192,7 +192,7 @@ public: * Destruct object. This should NOT be done before the transport * thread has completed it's work and raised the finished flag. **/ - ~FNET_TransportThread(); + ~FNET_TransportThread() override; /** @@ -458,8 +458,8 @@ public: void WaitFinished(); - // selector call-back for selector wakeup - void handle_wakeup(); + // Empty selector call-back for selector wakeup + void handle_wakeup() { } // selector call-back for io-events void handle_event(FNET_IOComponent &ctx, bool read, bool write); diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 5872706c443..893b8d1d2ca 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -14,6 +14,7 @@ #include <vespa/vespalib/component/vtag.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 2780c3e8770..d701358fc84 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -15,6 +15,7 @@ #include <vespa/fnet/frt/invokable.h> class FNET_Transport; +class FastOS_ThreadPool; namespace slobrok { namespace api { class RegisterAPI; } @@ -52,26 +53,26 @@ private: using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; - INetworkOwner *_owner; - Identity _ident; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - FNET_Scheduler &_scheduler; - std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; - std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; - std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; - int _requestedPort; - std::unique_ptr<RPCTargetPool> _targetPool; - std::unique_ptr<FNET_Task> _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; - std::unique_ptr<vespalib::ThreadStackExecutor> _executor; - std::unique_ptr<RPCSendAdapter> _sendV1; - std::unique_ptr<RPCSendAdapter> _sendV2; - SendAdapterMap _sendAdapters; - CompressionConfig _compressionConfig; - bool _allowDispatchForEncode; - bool _allowDispatchForDecode; + INetworkOwner *_owner; + Identity _ident; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + FNET_Scheduler &_scheduler; + std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; + std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; + std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; + int _requestedPort; + std::unique_ptr<RPCTargetPool> _targetPool; + std::unique_ptr<FNET_Task> _targetPoolTask; + std::unique_ptr<RPCServicePool> _servicePool; + std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; + std::unique_ptr<RPCSendAdapter> _sendV1; + std::unique_ptr<RPCSendAdapter> _sendV2; + SendAdapterMap _sendAdapters; + CompressionConfig _compressionConfig; + bool _allowDispatchForEncode; + bool _allowDispatchForDecode; /** diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index f8ee5afd1fd..b300080db9e 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -73,6 +73,7 @@ SharedRpcResources::SharedRpcResources(const config::ConfigUri& config_uri, _rpc_server_port(rpc_server_port), _shutdown(false) { + _transport->events_before_wakeup(100); } // TODO make sure init/shutdown is safe for aborted init in comm. mgr. diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.cpp b/vespalib/src/vespa/vespalib/net/async_resolver.cpp index bc918434077..08e1774b1dd 100644 --- a/vespalib/src/vespa/vespalib/net/async_resolver.cpp +++ b/vespalib/src/vespa/vespalib/net/async_resolver.cpp @@ -2,6 +2,7 @@ #include "async_resolver.h" #include "socket_spec.h" +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/log/log.h> LOG_SETUP(".vespalib.net.async_resolver"); @@ -149,15 +150,20 @@ AsyncResolver::SP AsyncResolver::_shared_resolver(nullptr); AsyncResolver::AsyncResolver(HostResolver::SP resolver, size_t num_threads) : _resolver(std::move(resolver)), - _executor(num_threads, 128*1024, async_resolver_executor_thread) + _executor(std::make_unique<ThreadStackExecutor>(num_threads, 128*1024, async_resolver_executor_thread)) { } void +AsyncResolver::wait_for_pending_resolves() { + _executor->sync(); +} + +void AsyncResolver::resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler) { auto task = std::make_unique<ResolveTask>(spec, *_resolver, std::move(result_handler)); - auto rejected = _executor.execute(std::move(task)); + auto rejected = _executor->execute(std::move(task)); assert(!rejected); } diff --git a/vespalib/src/vespa/vespalib/net/async_resolver.h b/vespalib/src/vespa/vespalib/net/async_resolver.h index 590e6672922..f9f2079004b 100644 --- a/vespalib/src/vespa/vespalib/net/async_resolver.h +++ b/vespalib/src/vespa/vespalib/net/async_resolver.h @@ -4,7 +4,7 @@ #include "socket_address.h" #include "socket_spec.h" -#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/arrayqueue.hpp> #include <chrono> #include <memory> @@ -117,15 +117,15 @@ private: void run() override; }; - HostResolver::SP _resolver; - ThreadStackExecutor _executor; - static std::mutex _shared_lock; - static AsyncResolver::SP _shared_resolver; + HostResolver::SP _resolver; + std::unique_ptr<SyncableThreadExecutor> _executor; + static std::mutex _shared_lock; + static AsyncResolver::SP _shared_resolver; AsyncResolver(HostResolver::SP resolver, size_t num_threads); public: void resolve_async(const vespalib::string &spec, ResultHandler::WP result_handler); - void wait_for_pending_resolves() { _executor.sync(); } + void wait_for_pending_resolves(); static AsyncResolver::SP create(Params params); static AsyncResolver::SP get_shared(); }; |