aboutsummaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-11-30 11:39:32 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-12-01 13:45:48 +0000
commita7a0764e7e785f12ed40cd45d420e1b81991d402 (patch)
tree34d1caf0842f7be3620b8d49e1f48f21adc67db7 /fnet
parentef1c43d5433e4cf5ae7dd581f1c72c46e5611118 (diff)
provide the config in the constructor only.
Diffstat (limited to 'fnet')
-rw-r--r--fnet/src/tests/connect/connect_test.cpp4
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp2
-rw-r--r--fnet/src/vespa/fnet/config.cpp3
-rw-r--r--fnet/src/vespa/fnet/config.h22
-rw-r--r--fnet/src/vespa/fnet/connection.cpp4
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp2
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp6
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h2
-rw-r--r--fnet/src/vespa/fnet/transport.cpp62
-rw-r--r--fnet/src/vespa/fnet/transport.h97
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp13
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h57
12 files changed, 101 insertions, 173 deletions
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp
index 4d92a4d66fc..2b4a2bbe9f0 100644
--- a/fnet/src/tests/connect/connect_test.cpp
+++ b/fnet/src/tests/connect/connect_test.cpp
@@ -95,13 +95,13 @@ struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
transport.Start(&pool);
}
TransportFixture(AsyncResolver::HostResolver::SP host_resolver)
- : streamer(nullptr), pool(128 * 1024), transport(make_resolver(std::move(host_resolver)), 1),
+ : streamer(nullptr), pool(128 * 1024), transport(TransportConfig().resolver(make_resolver(std::move(host_resolver)))),
conn_lost(), conn_deleted()
{
transport.Start(&pool);
}
TransportFixture(CryptoEngine::SP crypto)
- : streamer(nullptr), pool(128 * 1024), transport(crypto, 1),
+ : streamer(nullptr), pool(128 * 1024), transport(TransportConfig().crypto(std::move(crypto))),
conn_lost(), conn_deleted()
{
transport.Start(&pool);
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index ff85b389780..2577b4e6155 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -18,7 +18,7 @@ struct Rpc : FRT_Invokable {
FNET_Transport transport;
FRT_Supervisor orb;
Rpc(CryptoEngine::SP crypto, size_t num_threads)
- : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport) {}
+ : thread_pool(128 * 1024), transport(TransportConfig(num_threads).crypto(std::move(crypto))), orb(&transport) {}
void start() {
ASSERT_TRUE(transport.Start(&thread_pool));
}
diff --git a/fnet/src/vespa/fnet/config.cpp b/fnet/src/vespa/fnet/config.cpp
index a546d38f78b..aee9ac6f2c1 100644
--- a/fnet/src/vespa/fnet/config.cpp
+++ b/fnet/src/vespa/fnet/config.cpp
@@ -3,7 +3,8 @@
#include "config.h"
FNET_Config::FNET_Config()
- : _iocTimeOut(0),
+ : _events_before_wakeup(1),
+ _iocTimeOut(0),
_maxInputBufferSize(0x10000),
_maxOutputBufferSize(0x10000),
_tcpNoDelay(true)
diff --git a/fnet/src/vespa/fnet/config.h b/fnet/src/vespa/fnet/config.h
index 3f34c1511b6..1dc372c98a7 100644
--- a/fnet/src/vespa/fnet/config.h
+++ b/fnet/src/vespa/fnet/config.h
@@ -11,10 +11,30 @@
class FNET_Config
{
public:
+ uint32_t _events_before_wakeup;
uint32_t _iocTimeOut;
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
bool _tcpNoDelay;
- FNET_Config();
+ explicit FNET_Config();
+ FNET_Config & events_before_wakeup(uint32_t v) {
+ if (v > 1) {
+ _events_before_wakeup = v;
+ }
+ return *this;
+ }
+ FNET_Config & maxInputBufferSize(uint32_t v) {
+ _maxInputBufferSize = v;
+ return *this;
+ }
+ FNET_Config & maxOutputBufferSize(uint32_t v) {
+ _maxOutputBufferSize = v;
+ return *this;
+ }
+ FNET_Config & tcpNoDelay(bool v) {
+ _tcpNoDelay = v;
+ return *this;
+ }
+
};
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index c5afd627a5a..a2e6fe25edc 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -347,7 +347,7 @@ done_read:
}
UpdateTimeOut();
- uint32_t maxSize = GetConfig()->_maxInputBufferSize;
+ uint32_t maxSize = getConfig()._maxInputBufferSize;
if (maxSize > 0 && _input.GetBufSize() > maxSize)
{
if (!_flags._gotheader || _packetLength < maxSize) {
@@ -430,7 +430,7 @@ FNET_Connection::Write()
}
}
- uint32_t maxSize = GetConfig()->_maxOutputBufferSize;
+ uint32_t maxSize = getConfig()._maxOutputBufferSize;
if (maxSize > 0 && _output.GetBufSize() > maxSize) {
_output.Shrink(maxSize);
}
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index 7e92855583e..388d754ece4 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -419,7 +419,7 @@ StandaloneFRT::StandaloneFRT()
StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto)
: _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)),
- _transport(std::make_unique<FNET_Transport>(std::move(crypto), 1)),
+ _transport(std::make_unique<FNET_Transport>(TransportConfig().crypto(std::move(crypto)))),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
{
_transport->Start(_threadPool.get());
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index 81d1bf47567..7afab188b66 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -32,9 +32,9 @@ FNET_IOComponent::~FNET_IOComponent()
assert(_ioc_selector == nullptr);
}
-FNET_Config *
-FNET_IOComponent::GetConfig() {
- return _ioc_owner->GetConfig();
+const FNET_Config &
+FNET_IOComponent::getConfig() const {
+ return _ioc_owner->getConfig();
}
void
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index 84e3c8bd412..5fd5079a8af 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -138,7 +138,7 @@ public:
*
* @return config object.
**/
- FNET_Config *GetConfig();
+ const FNET_Config & getConfig() const;
/**
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index e59186069ce..ce5f44efb7c 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -25,15 +25,33 @@ VESPA_THREAD_STACK_TAG(fnet_work_pool);
} // namespace <unnamed>
-FNET_Transport::FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads)
- : _async_resolver(std::move(resolver)),
- _crypto_engine(std::move(crypto)),
+TransportConfig::TransportConfig(int num_threads)
+ : _config(),
+ _resolver(),
+ _crypto(),
+ _num_threads(num_threads)
+{}
+
+TransportConfig::~TransportConfig() = default;
+
+vespalib::AsyncResolver::SP
+TransportConfig::resolver() const {
+ return _resolver ? _resolver : vespalib::AsyncResolver::get_shared();
+}
+vespalib::CryptoEngine::SP
+TransportConfig::crypto() const {
+ return _crypto ? _crypto : vespalib::CryptoEngine::get_default();
+}
+
+FNET_Transport::FNET_Transport(TransportConfig cfg)
+ : _async_resolver(cfg.resolver()),
+ _crypto_engine(cfg.crypto()),
_work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128 * 1024, fnet_work_pool, 1024)),
_threads(),
- _events_before_wakeup(1)
+ _config(cfg.config())
{
- assert(num_threads >= 1);
- for (size_t i = 0; i < num_threads; ++i) {
+ assert(cfg.num_threads() >= 1);
+ for (size_t i = 0; i < cfg.num_threads(); ++i) {
_threads.emplace_back(std::make_unique<FNET_TransportThread>(*this));
}
}
@@ -104,38 +122,6 @@ FNET_Transport::GetNumIOComponents()
}
void
-FNET_Transport::SetIOCTimeOut(uint32_t ms)
-{
- for (const auto &thread: _threads) {
- thread->SetIOCTimeOut(ms);
- }
-}
-
-void
-FNET_Transport::SetMaxInputBufferSize(uint32_t bytes)
-{
- for (const auto &thread: _threads) {
- thread->SetMaxInputBufferSize(bytes);
- }
-}
-
-void
-FNET_Transport::SetMaxOutputBufferSize(uint32_t bytes)
-{
- for (const auto &thread: _threads) {
- thread->SetMaxOutputBufferSize(bytes);
- }
-}
-
-void
-FNET_Transport::SetTCPNoDelay(bool noDelay)
-{
- for (const auto &thread: _threads) {
- thread->SetTCPNoDelay(noDelay);
- }
-}
-
-void
FNET_Transport::sync()
{
for (const auto &thread: _threads) {
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 4acac4d6c6e..0ac7f473789 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -17,6 +17,34 @@ class FNET_IServerAdapter;
class FNET_IPacketHandler;
class FNET_Scheduler;
+class TransportConfig {
+public:
+ TransportConfig() : TransportConfig(1) {}
+ explicit TransportConfig(int num_threads);
+ ~TransportConfig();
+ vespalib::AsyncResolver::SP resolver() const;
+ vespalib::CryptoEngine::SP crypto() const;
+ TransportConfig & resolver(vespalib::AsyncResolver::SP resolver_in) {
+ _resolver = std::move(resolver_in);
+ return *this;
+ }
+ TransportConfig & crypto(vespalib::CryptoEngine::SP crypto_in) {
+ _crypto = std::move(crypto_in);
+ return *this;
+ }
+ TransportConfig & config(const FNET_Config & config_in) {
+ _config = config_in;
+ return *this;
+ }
+ const FNET_Config & config() const { return _config; }
+ FNET_Config & config() { return _config; }
+ uint32_t num_threads() const { return _num_threads; }
+private:
+ FNET_Config _config;
+ vespalib::AsyncResolver::SP _resolver;
+ vespalib::CryptoEngine::SP _crypto;
+ uint32_t _num_threads;
+};
/**
* This class represents the transport layer and handles a collection
* of transport threads. Note: remember to shut down your transport
@@ -31,8 +59,8 @@ private:
vespalib::AsyncResolver::SP _async_resolver;
vespalib::CryptoEngine::SP _crypto_engine;
std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool;
- Threads _threads;
- size_t _events_before_wakeup;
+ Threads _threads;
+ FNET_Config _config;
public:
/**
@@ -42,22 +70,15 @@ public:
* the current thread become the transport thread. Main may only
* be called for single-threaded transports.
**/
- FNET_Transport(vespalib::AsyncResolver::SP resolver, vespalib::CryptoEngine::SP crypto, size_t num_threads);
+ FNET_Transport(TransportConfig config);
- FNET_Transport(vespalib::AsyncResolver::SP resolver, size_t num_threads)
- : FNET_Transport(std::move(resolver), vespalib::CryptoEngine::get_default(), num_threads) {}
- FNET_Transport(vespalib::CryptoEngine::SP crypto, size_t num_threads)
- : FNET_Transport(vespalib::AsyncResolver::get_shared(), std::move(crypto), num_threads) {}
- FNET_Transport(size_t num_threads)
- : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), num_threads) {}
+ explicit FNET_Transport(uint32_t num_threads)
+ : FNET_Transport(TransportConfig(num_threads)) {}
FNET_Transport()
- : FNET_Transport(vespalib::AsyncResolver::get_shared(), vespalib::CryptoEngine::get_default(), 1) {}
+ : FNET_Transport(TransportConfig()) {}
~FNET_Transport();
- size_t events_before_wakeup() const { return _events_before_wakeup; }
- void events_before_wakeup(size_t events_before_wakeup_in) {
- _events_before_wakeup = events_before_wakeup_in;
- }
+ const FNET_Config & getConfig() const { return _config; }
/**
* Try to execute the given task on the internal work pool
@@ -174,54 +195,6 @@ public:
uint32_t GetNumIOComponents();
/**
- * Set the I/O Component timeout. Idle I/O Components with timeout
- * enabled (determined by calling the ShouldTimeOut method) will
- * time out if idle for the given number of milliseconds. An I/O
- * component reports its un-idle-ness by calling the UpdateTimeOut
- * method in the owning transport object. Calling this method with 0
- * as parameter will disable I/O Component timeouts. Note that newly
- * created transport objects begin their lives with I/O Component
- * timeouts disabled. An I/O Component timeout has the same effect
- * as calling the Close method in the transport object with the
- * target I/O Component as parameter.
- *
- * @param ms number of milliseconds before IOC idle timeout occurs.
- **/
- void SetIOCTimeOut(uint32_t ms);
-
- /**
- * Set maximum input buffer size. This value will only affect
- * connections that use a common input buffer when decoding
- * incoming packets. Note that this value is not an absolute
- * max. The buffer will still grow larger than this value if
- * needed to decode big packets. However, when the buffer becomes
- * larger than this value, it will be shrunk back when possible.
- *
- * @param bytes buffer size in bytes. 0 means unlimited.
- **/
- void SetMaxInputBufferSize(uint32_t bytes);
-
- /**
- * Set maximum output buffer size. This value will only affect
- * connections that use a common output buffer when encoding
- * outgoing packets. Note that this value is not an absolute
- * max. The buffer will still grow larger than this value if needed
- * to encode big packets. However, when the buffer becomes larger
- * than this value, it will be shrunk back when possible.
- *
- * @param bytes buffer size in bytes. 0 means unlimited.
- **/
- void SetMaxOutputBufferSize(uint32_t bytes);
-
- /**
- * Enable or disable use of the TCP_NODELAY flag with sockets
- * created by this transport object.
- *
- * @param noDelay true if TCP_NODELAY flag should be used.
- **/
- void SetTCPNoDelay(bool noDelay);
-
- /**
* Synchronize with all transport threads. This method will block
* until all events posted before this method was invoked has been
* processed. If a transport thread has been shut down (or is in
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index d61eaffa24f..6b66b1ebe4b 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -125,7 +125,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
_queue.QueuePacket_NoLock(cpacket, context);
qLen = _queue.GetPacketCnt_NoLock();
}
- if (qLen == _owner.events_before_wakeup()) {
+ if (qLen == getConfig()._events_before_wakeup) {
_selector.wakeup();
}
return true;
@@ -209,7 +209,6 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
: _owner(owner_in),
_now(clock::now()),
_scheduler(&_now),
- _config(),
_componentsHead(nullptr),
_timeOutHead(nullptr),
_componentsTail(nullptr),
@@ -242,13 +241,17 @@ FNET_TransportThread::~FNET_TransportThread()
}
}
+const FNET_Config &
+FNET_TransportThread::getConfig() const {
+ return _owner.getConfig();
+}
bool
FNET_TransportThread::tune(SocketHandle &handle) const
{
handle.set_keepalive(true);
handle.set_linger(true, 0);
- handle.set_nodelay(_config._tcpNoDelay);
+ handle.set_nodelay(getConfig()._tcpNoDelay);
return handle.set_blocking(false);
}
@@ -486,8 +489,8 @@ FNET_TransportThread::EventLoopIteration()
_selector.dispatch(*this);
// handle IOC time-outs
- if (_config._iocTimeOut > 0) {
- time_point oldest = (_now - std::chrono::milliseconds(_config._iocTimeOut));
+ if (getConfig()._iocTimeOut > 0) {
+ time_point oldest = (_now - std::chrono::milliseconds(getConfig()._iocTimeOut));
while (_timeOutHead != nullptr &&
oldest > _timeOutHead->_ioc_timestamp) {
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index b4319d4e2bc..fd61d99a8ac 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -37,7 +37,6 @@ private:
FNET_Transport &_owner; // owning transport layer
time_point _now; // current time sampler
FNET_Scheduler _scheduler; // transport thread scheduler
- FNET_Config _config; // FNET configuration [static]
FNET_IOComponent *_componentsHead; // I/O component list head
FNET_IOComponent *_timeOutHead; // first IOC in list to time out
FNET_IOComponent *_componentsTail; // I/O component list tail
@@ -140,7 +139,7 @@ private:
*
* @return config object.
**/
- FNET_Config *GetConfig() { return &_config; }
+ const FNET_Config & getConfig() const;
void handle_add_cmd(FNET_IOComponent *ioc);
@@ -269,60 +268,6 @@ public:
**/
uint32_t GetNumIOComponents() { return _componentCnt; }
-
- /**
- * Set the I/O Component timeout. Idle I/O Components with timeout
- * enabled (determined by calling the ShouldTimeOut method) will
- * time out if idle for the given number of milliseconds. An I/O
- * component reports its un-idle-ness by calling the UpdateTimeOut
- * method in the owning transport object. Calling this method with 0
- * as parameter will disable I/O Component timeouts. Note that newly
- * created transport objects begin their lives with I/O Component
- * timeouts disabled. An I/O Component timeout has the same effect
- * as calling the Close method in the transport object with the
- * target I/O Component as parameter.
- *
- * @param ms number of milliseconds before IOC idle timeout occurs.
- **/
- void SetIOCTimeOut(uint32_t ms) { _config._iocTimeOut = ms; }
-
-
- /**
- * Set maximum input buffer size. This value will only affect
- * connections that use a common input buffer when decoding
- * incoming packets. Note that this value is not an absolute
- * max. The buffer will still grow larger than this value if
- * needed to decode big packets. However, when the buffer becomes
- * larger than this value, it will be shrunk back when possible.
- *
- * @param bytes buffer size in bytes. 0 means unlimited.
- **/
- void SetMaxInputBufferSize(uint32_t bytes)
- { _config._maxInputBufferSize = bytes; }
-
-
- /**
- * Set maximum output buffer size. This value will only affect
- * connections that use a common output buffer when encoding
- * outgoing packets. Note that this value is not an absolute
- * max. The buffer will still grow larger than this value if needed
- * to encode big packets. However, when the buffer becomes larger
- * than this value, it will be shrunk back when possible.
- *
- * @param bytes buffer size in bytes. 0 means unlimited.
- **/
- void SetMaxOutputBufferSize(uint32_t bytes)
- { _config._maxOutputBufferSize = bytes; }
-
- /**
- * Enable or disable use of the TCP_NODELAY flag with sockets
- * created by this transport object.
- *
- * @param noDelay true if TCP_NODELAY flag should be used.
- **/
- void SetTCPNoDelay(bool noDelay) { _config._tcpNoDelay = noDelay; }
-
-
/**
* Add an I/O component to the working set of this transport
* object. Note that the actual work is performed by the transport