From a7a0764e7e785f12ed40cd45d420e1b81991d402 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 30 Nov 2020 11:39:32 +0000 Subject: provide the config in the constructor only. --- fnet/src/tests/connect/connect_test.cpp | 4 +- .../tests/frt/parallel_rpc/parallel_rpc_test.cpp | 2 +- fnet/src/vespa/fnet/config.cpp | 3 +- fnet/src/vespa/fnet/config.h | 22 ++++- fnet/src/vespa/fnet/connection.cpp | 4 +- fnet/src/vespa/fnet/frt/supervisor.cpp | 2 +- fnet/src/vespa/fnet/iocomponent.cpp | 6 +- fnet/src/vespa/fnet/iocomponent.h | 2 +- fnet/src/vespa/fnet/transport.cpp | 62 ++++++-------- fnet/src/vespa/fnet/transport.h | 97 ++++++++-------------- fnet/src/vespa/fnet/transport_thread.cpp | 13 +-- fnet/src/vespa/fnet/transport_thread.h | 57 +------------ 12 files changed, 101 insertions(+), 173 deletions(-) (limited to 'fnet/src') diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 4d92a4d66fc..2b4a2bbe9f0 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -95,13 +95,13 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler { transport.Start(&pool); } TransportFixture(AsyncResolver::HostResolver::SP host_resolver) - : streamer(nullptr), pool(128 * 1024), transport(make_resolver(std::move(host_resolver)), 1), + : streamer(nullptr), pool(128 * 1024), transport(TransportConfig().resolver(make_resolver(std::move(host_resolver)))), conn_lost(), conn_deleted() { transport.Start(&pool); } TransportFixture(CryptoEngine::SP crypto) - : streamer(nullptr), pool(128 * 1024), transport(crypto, 1), + : streamer(nullptr), pool(128 * 1024), transport(TransportConfig().crypto(std::move(crypto))), conn_lost(), conn_deleted() { transport.Start(&pool); diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index ff85b389780..2577b4e6155 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -18,7 +18,7 @@ struct Rpc : FRT_Invokable { FNET_Transport transport; FRT_Supervisor orb; Rpc(CryptoEngine::SP crypto, size_t num_threads) - : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport) {} + : thread_pool(128 * 1024), transport(TransportConfig(num_threads).crypto(std::move(crypto))), orb(&transport) {} void start() { ASSERT_TRUE(transport.Start(&thread_pool)); } diff --git a/fnet/src/vespa/fnet/config.cpp b/fnet/src/vespa/fnet/config.cpp index a546d38f78b..aee9ac6f2c1 100644 --- a/fnet/src/vespa/fnet/config.cpp +++ b/fnet/src/vespa/fnet/config.cpp @@ -3,7 +3,8 @@ #include "config.h" FNET_Config::FNET_Config() - : _iocTimeOut(0), + : _events_before_wakeup(1), + _iocTimeOut(0), _maxInputBufferSize(0x10000), _maxOutputBufferSize(0x10000), _tcpNoDelay(true) diff --git a/fnet/src/vespa/fnet/config.h b/fnet/src/vespa/fnet/config.h index 3f34c1511b6..1dc372c98a7 100644 --- a/fnet/src/vespa/fnet/config.h +++ b/fnet/src/vespa/fnet/config.h @@ -11,10 +11,30 @@ class FNET_Config { public: + uint32_t _events_before_wakeup; uint32_t _iocTimeOut; uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; bool _tcpNoDelay; - FNET_Config(); + explicit FNET_Config(); + FNET_Config & events_before_wakeup(uint32_t v) { + if (v > 1) { + _events_before_wakeup = v; + } + return *this; + } + FNET_Config & maxInputBufferSize(uint32_t v) { + _maxInputBufferSize = v; + return *this; + } + FNET_Config & maxOutputBufferSize(uint32_t v) { + _maxOutputBufferSize = v; + return *this; + } + FNET_Config & tcpNoDelay(bool v) { + _tcpNoDelay = v; + return *this; + } + }; diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index c5afd627a5a..a2e6fe25edc 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -347,7 +347,7 @@ done_read: } UpdateTimeOut(); - uint32_t maxSize = GetConfig()->_maxInputBufferSize; + uint32_t maxSize = getConfig()._maxInputBufferSize; if (maxSize > 0 && _input.GetBufSize() > maxSize) { if (!_flags._gotheader || _packetLength < maxSize) { @@ -430,7 +430,7 @@ FNET_Connection::Write() } } - uint32_t maxSize = GetConfig()->_maxOutputBufferSize; + uint32_t maxSize = getConfig()._maxOutputBufferSize; if (maxSize > 0 && _output.GetBufSize() > maxSize) { _output.Shrink(maxSize); } diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index 7e92855583e..388d754ece4 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -419,7 +419,7 @@ StandaloneFRT::StandaloneFRT() StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto) : _threadPool(std::make_unique(1024*128)), - _transport(std::make_unique(std::move(crypto), 1)), + _transport(std::make_unique(TransportConfig().crypto(std::move(crypto)))), _supervisor(std::make_unique(_transport.get())) { _transport->Start(_threadPool.get()); diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index 81d1bf47567..7afab188b66 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -32,9 +32,9 @@ FNET_IOComponent::~FNET_IOComponent() assert(_ioc_selector == nullptr); } -FNET_Config * -FNET_IOComponent::GetConfig() { - return _ioc_owner->GetConfig(); +const FNET_Config & +FNET_IOComponent::getConfig() const { + return _ioc_owner->getConfig(); } void diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index 84e3c8bd412..5fd5079a8af 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -138,7 +138,7 @@ public: * * @return config object. **/ - FNET_Config *GetConfig(); + const FNET_Config & getConfig() const; /** diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index e59186069ce..ce5f44efb7c 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -25,15 +25,33 @@ VESPA_THREAD_STACK_TAG(fnet_work_pool); } // namespace -FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads) - : _async_resolver(std::move(resolver)), - _crypto_engine(std::move(crypto)), +TransportConfig::TransportConfig(int num_threads) + : _config(), + _resolver(), + _crypto(), + _num_threads(num_threads) +{} + +TransportConfig::~TransportConfig() = default; + +vespalib::AsyncResolver::SP +TransportConfig::resolver() const { + return _resolver ? _resolver : vespalib::AsyncResolver::get_shared(); +} +vespalib::CryptoEngine::SP +TransportConfig::crypto() const { + return _crypto ? _crypto : vespalib::CryptoEngine::get_default(); +} + +FNET_Transport::FNET_Transport(TransportConfig cfg) + : _async_resolver(cfg.resolver()), + _crypto_engine(cfg.crypto()), _work_pool(std::make_unique(1, 128 * 1024, fnet_work_pool, 1024)), _threads(), - _events_before_wakeup(1) + _config(cfg.config()) { - assert(num_threads >= 1); - for (size_t i = 0; i < num_threads; ++i) { + assert(cfg.num_threads() >= 1); + for (size_t i = 0; i < cfg.num_threads(); ++i) { _threads.emplace_back(std::make_unique(*this)); } } @@ -103,38 +121,6 @@ FNET_Transport::GetNumIOComponents() return result; } -void -FNET_Transport::SetIOCTimeOut(uint32_t ms) -{ - for (const auto &thread: _threads) { - thread->SetIOCTimeOut(ms); - } -} - -void -FNET_Transport::SetMaxInputBufferSize(uint32_t bytes) -{ - for (const auto &thread: _threads) { - thread->SetMaxInputBufferSize(bytes); - } -} - -void -FNET_Transport::SetMaxOutputBufferSize(uint32_t bytes) -{ - for (const auto &thread: _threads) { - thread->SetMaxOutputBufferSize(bytes); - } -} - -void -FNET_Transport::SetTCPNoDelay(bool noDelay) -{ - for (const auto &thread: _threads) { - thread->SetTCPNoDelay(noDelay); - } -} - void FNET_Transport::sync() { diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 4acac4d6c6e..0ac7f473789 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -17,6 +17,34 @@ class FNET_IServerAdapter; class FNET_IPacketHandler; class FNET_Scheduler; +class TransportConfig { +public: + TransportConfig() : TransportConfig(1) {} + explicit TransportConfig(int num_threads); + ~TransportConfig(); + vespalib::AsyncResolver::SP resolver() const; + vespalib::CryptoEngine::SP crypto() const; + TransportConfig & resolver(vespalib::AsyncResolver::SP resolver_in) { + _resolver = std::move(resolver_in); + return *this; + } + TransportConfig & crypto(vespalib::CryptoEngine::SP crypto_in) { + _crypto = std::move(crypto_in); + return *this; + } + TransportConfig & config(const FNET_Config & config_in) { + _config = config_in; + return *this; + } + const FNET_Config & config() const { return _config; } + FNET_Config & config() { return _config; } + uint32_t num_threads() const { return _num_threads; } +private: + FNET_Config _config; + vespalib::AsyncResolver::SP _resolver; + vespalib::CryptoEngine::SP _crypto; + uint32_t _num_threads; +}; /** * This class represents the transport layer and handles a collection * of transport threads. Note: remember to shut down your transport @@ -31,8 +59,8 @@ private: vespalib::AsyncResolver::SP _async_resolver; vespalib::CryptoEngine::SP _crypto_engine; std::unique_ptr _work_pool; - Threads _threads; - size_t _events_before_wakeup; + Threads _threads; + FNET_Config _config; public: /** @@ -42,22 +70,15 @@ public: * the current thread become the transport thread. Main may only * be called for single-threaded transports. **/ - FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads); + FNET_Transport(TransportConfig config); - FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads) - : FNET_Transport(std::move(resolver), vespalib::CryptoEngine::get_default(), num_threads) {} - FNET_Transport(vespalib::CryptoEngine::SP crypto, size_t num_threads) - : FNET_Transport(vespalib::AsyncResolver::get_shared(), std::move(crypto), num_threads) {} - FNET_Transport(size_t num_threads) - : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), num_threads) {} + explicit FNET_Transport(uint32_t num_threads) + : FNET_Transport(TransportConfig(num_threads)) {} FNET_Transport() - : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), 1) {} + : FNET_Transport(TransportConfig()) {} ~FNET_Transport(); - size_t events_before_wakeup() const { return _events_before_wakeup; } - void events_before_wakeup(size_t events_before_wakeup_in) { - _events_before_wakeup = events_before_wakeup_in; - } + const FNET_Config & getConfig() const { return _config; } /** * Try to execute the given task on the internal work pool @@ -173,54 +194,6 @@ public: **/ uint32_t GetNumIOComponents(); - /** - * Set the I/O Component timeout. Idle I/O Components with timeout - * enabled (determined by calling the ShouldTimeOut method) will - * time out if idle for the given number of milliseconds. An I/O - * component reports its un-idle-ness by calling the UpdateTimeOut - * method in the owning transport object. Calling this method with 0 - * as parameter will disable I/O Component timeouts. Note that newly - * created transport objects begin their lives with I/O Component - * timeouts disabled. An I/O Component timeout has the same effect - * as calling the Close method in the transport object with the - * target I/O Component as parameter. - * - * @param ms number of milliseconds before IOC idle timeout occurs. - **/ - void SetIOCTimeOut(uint32_t ms); - - /** - * Set maximum input buffer size. This value will only affect - * connections that use a common input buffer when decoding - * incoming packets. Note that this value is not an absolute - * max. The buffer will still grow larger than this value if - * needed to decode big packets. However, when the buffer becomes - * larger than this value, it will be shrunk back when possible. - * - * @param bytes buffer size in bytes. 0 means unlimited. - **/ - void SetMaxInputBufferSize(uint32_t bytes); - - /** - * Set maximum output buffer size. This value will only affect - * connections that use a common output buffer when encoding - * outgoing packets. Note that this value is not an absolute - * max. The buffer will still grow larger than this value if needed - * to encode big packets. However, when the buffer becomes larger - * than this value, it will be shrunk back when possible. - * - * @param bytes buffer size in bytes. 0 means unlimited. - **/ - void SetMaxOutputBufferSize(uint32_t bytes); - - /** - * Enable or disable use of the TCP_NODELAY flag with sockets - * created by this transport object. - * - * @param noDelay true if TCP_NODELAY flag should be used. - **/ - void SetTCPNoDelay(bool noDelay); - /** * Synchronize with all transport threads. This method will block * until all events posted before this method was invoked has been diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index d61eaffa24f..6b66b1ebe4b 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -125,7 +125,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, _queue.QueuePacket_NoLock(cpacket, context); qLen = _queue.GetPacketCnt_NoLock(); } - if (qLen == _owner.events_before_wakeup()) { + if (qLen == getConfig()._events_before_wakeup) { _selector.wakeup(); } return true; @@ -209,7 +209,6 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) : _owner(owner_in), _now(clock::now()), _scheduler(&_now), - _config(), _componentsHead(nullptr), _timeOutHead(nullptr), _componentsTail(nullptr), @@ -242,13 +241,17 @@ FNET_TransportThread::~FNET_TransportThread() } } +const FNET_Config & +FNET_TransportThread::getConfig() const { + return _owner.getConfig(); +} bool FNET_TransportThread::tune(SocketHandle &handle) const { handle.set_keepalive(true); handle.set_linger(true, 0); - handle.set_nodelay(_config._tcpNoDelay); + handle.set_nodelay(getConfig()._tcpNoDelay); return handle.set_blocking(false); } @@ -486,8 +489,8 @@ FNET_TransportThread::EventLoopIteration() _selector.dispatch(*this); // handle IOC time-outs - if (_config._iocTimeOut > 0) { - time_point oldest = (_now - std::chrono::milliseconds(_config._iocTimeOut)); + if (getConfig()._iocTimeOut > 0) { + time_point oldest = (_now - std::chrono::milliseconds(getConfig()._iocTimeOut)); while (_timeOutHead != nullptr && oldest > _timeOutHead->_ioc_timestamp) { diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index b4319d4e2bc..fd61d99a8ac 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -37,7 +37,6 @@ private: FNET_Transport &_owner; // owning transport layer time_point _now; // current time sampler FNET_Scheduler _scheduler; // transport thread scheduler - FNET_Config _config; // FNET configuration [static] FNET_IOComponent *_componentsHead; // I/O component list head FNET_IOComponent *_timeOutHead; // first IOC in list to time out FNET_IOComponent *_componentsTail; // I/O component list tail @@ -140,7 +139,7 @@ private: * * @return config object. **/ - FNET_Config *GetConfig() { return &_config; } + const FNET_Config & getConfig() const; void handle_add_cmd(FNET_IOComponent *ioc); @@ -269,60 +268,6 @@ public: **/ uint32_t GetNumIOComponents() { return _componentCnt; } - - /** - * Set the I/O Component timeout. Idle I/O Components with timeout - * enabled (determined by calling the ShouldTimeOut method) will - * time out if idle for the given number of milliseconds. An I/O - * component reports its un-idle-ness by calling the UpdateTimeOut - * method in the owning transport object. Calling this method with 0 - * as parameter will disable I/O Component timeouts. Note that newly - * created transport objects begin their lives with I/O Component - * timeouts disabled. An I/O Component timeout has the same effect - * as calling the Close method in the transport object with the - * target I/O Component as parameter. - * - * @param ms number of milliseconds before IOC idle timeout occurs. - **/ - void SetIOCTimeOut(uint32_t ms) { _config._iocTimeOut = ms; } - - - /** - * Set maximum input buffer size. This value will only affect - * connections that use a common input buffer when decoding - * incoming packets. Note that this value is not an absolute - * max. The buffer will still grow larger than this value if - * needed to decode big packets. However, when the buffer becomes - * larger than this value, it will be shrunk back when possible. - * - * @param bytes buffer size in bytes. 0 means unlimited. - **/ - void SetMaxInputBufferSize(uint32_t bytes) - { _config._maxInputBufferSize = bytes; } - - - /** - * Set maximum output buffer size. This value will only affect - * connections that use a common output buffer when encoding - * outgoing packets. Note that this value is not an absolute - * max. The buffer will still grow larger than this value if needed - * to encode big packets. However, when the buffer becomes larger - * than this value, it will be shrunk back when possible. - * - * @param bytes buffer size in bytes. 0 means unlimited. - **/ - void SetMaxOutputBufferSize(uint32_t bytes) - { _config._maxOutputBufferSize = bytes; } - - /** - * Enable or disable use of the TCP_NODELAY flag with sockets - * created by this transport object. - * - * @param noDelay true if TCP_NODELAY flag should be used. - **/ - void SetTCPNoDelay(bool noDelay) { _config._tcpNoDelay = noDelay; } - - /** * Add an I/O component to the working set of this transport * object. Note that the actual work is performed by the transport -- cgit v1.2.3