diff options
author | Håvard Pettersen <havardpe@oath.com> | 2021-06-01 09:13:14 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2021-06-01 11:22:31 +0000 |
commit | 5a66d61ea772f59583c9af03087014d74051fecf (patch) | |
tree | 77dda4d754178a287eb98c53b0789eccca2abb7b /fnet | |
parent | 97bb8e47324450b1cd2ec77a5ee2b03686631bce (diff) |
drop empty buffers
Diffstat (limited to 'fnet')
-rw-r--r-- | fnet/src/tests/connect/connect_test.cpp | 1 | ||||
-rw-r--r-- | fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp | 43 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/config.cpp | 3 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/config.h | 1 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 12 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 7 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 5 |
7 files changed, 58 insertions, 14 deletions
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp index 377035726dd..bb2acdb141e 100644 --- a/fnet/src/tests/connect/connect_test.cpp +++ b/fnet/src/tests/connect/connect_test.cpp @@ -65,6 +65,7 @@ struct BlockingCryptoSocket : public CryptoSocket { ssize_t write(const char *buf, size_t len) override { return socket.write(buf, len); } ssize_t flush() override { return 0; } ssize_t half_close() override { return socket.half_close(); } + void drop_empty_buffers() override {} }; struct BlockingCryptoEngine : public CryptoEngine { diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 58acb928540..b03df359715 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -18,8 +18,8 @@ struct Rpc : FRT_Invokable { FastOS_ThreadPool thread_pool; FNET_Transport transport; FRT_Supervisor orb; - Rpc(CryptoEngine::SP crypto, size_t num_threads) - : thread_pool(128_Ki), transport(TransportConfig(num_threads).crypto(std::move(crypto))), orb(&transport) {} + Rpc(CryptoEngine::SP crypto, size_t num_threads, bool drop_empty) + : thread_pool(128_Ki), transport(TransportConfig(num_threads).crypto(std::move(crypto)).drop_empty_buffers(drop_empty)), orb(&transport) {} void start() { ASSERT_TRUE(transport.Start(&thread_pool)); } @@ -38,7 +38,7 @@ struct Rpc : FRT_Invokable { struct Server : Rpc { uint32_t port; - Server(CryptoEngine::SP crypto, size_t num_threads) : Rpc(std::move(crypto), num_threads), port(listen()) { + Server(CryptoEngine::SP crypto, size_t num_threads, bool drop_empty = false) : Rpc(std::move(crypto), num_threads, drop_empty), port(listen()) { init_rpc(); start(); } @@ -58,7 +58,7 @@ struct Server : Rpc { struct Client : Rpc { uint32_t port; - Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server) : Rpc(std::move(crypto), num_threads), port(server.port) { + Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server, bool drop_empty = false) : Rpc(std::move(crypto), num_threads, drop_empty), port(server.port) { start(); } FRT_Target *connect() { return Rpc::connect(port); } @@ -85,7 +85,16 @@ struct Result { } }; -void perform_test(size_t thread_id, Client &client, Result &result) { +bool verbose = false; +double budget = 1.5; + +void perform_test(size_t thread_id, Client &client, Result &result, bool vital = false) { + if (!vital && !verbose) { + if (thread_id == 0) { + fprintf(stderr, "... skipping non-vital test; run with 'verbose' to enable\n"); + } + return; + } uint64_t seq = 0; FRT_Target *target = client.connect(); FRT_RPCRequest *req = client.orb.AllocRPCRequest(); @@ -101,7 +110,7 @@ void perform_test(size_t thread_id, Client &client, Result &result) { }; size_t loop_cnt = 8; BenchmarkTimer::benchmark(invoke, invoke, 0.5); - BenchmarkTimer timer(1.5); + BenchmarkTimer timer(budget); while (timer.has_budget()) { timer.before(); for (size_t i = 0; i < loop_cnt; ++i) { @@ -139,13 +148,29 @@ TEST_MT_FFF("parallel rpc with 1/1 transport threads and num_cores user threads TEST_MT_FFF("parallel rpc with 1/1 transport threads and num_cores user threads (tls encryption)", getNumThreads(), Server(tls_crypto, 1), Client(tls_crypto, 1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } +TEST_MT_FFF("parallel rpc with 1/1 transport threads and num_cores user threads (tls encryption + drop empty buffers)", + getNumThreads(), Server(tls_crypto, 1, true), Client(tls_crypto, 1, f1, true), Result(num_threads)) { perform_test(thread_id, f2, f3); } + TEST_MT_FFF("parallel rpc with 8/8 transport threads and num_cores user threads (no encryption)", - getNumThreads(), Server(null_crypto, 8), Client(null_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } + getNumThreads(), Server(null_crypto, 8), Client(null_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3, true); } TEST_MT_FFF("parallel rpc with 8/8 transport threads and num_cores user threads (xor encryption)", getNumThreads(), Server(xor_crypto, 8), Client(xor_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } TEST_MT_FFF("parallel rpc with 8/8 transport threads and num_cores user threads (tls encryption)", - getNumThreads(), Server(tls_crypto, 8), Client(tls_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } + getNumThreads(), Server(tls_crypto, 8), Client(tls_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3, true); } -TEST_MAIN() { TEST_RUN_ALL(); } +TEST_MT_FFF("parallel rpc with 8/8 transport threads and num_cores user threads (tls encryption + drop empty buffers)", + getNumThreads(), Server(tls_crypto, 8, true), Client(tls_crypto, 8, f1, true), Result(num_threads)) { perform_test(thread_id, f2, f3); } + +//----------------------------------------------------------------------------- + +int main(int argc, char **argv) { + TEST_MASTER.init(__FILE__); + if ((argc == 2) && (argv[1] == std::string("verbose"))) { + verbose = true; + budget = 10.0; + } + TEST_RUN_ALL(); + return (TEST_MASTER.fini() ? 0 : 1); +} diff --git a/fnet/src/vespa/fnet/config.cpp b/fnet/src/vespa/fnet/config.cpp index 856d0ab4c1c..daf2b57429f 100644 --- a/fnet/src/vespa/fnet/config.cpp +++ b/fnet/src/vespa/fnet/config.cpp @@ -7,6 +7,7 @@ FNET_Config::FNET_Config() _events_before_wakeup(1), _maxInputBufferSize(0x10000), _maxOutputBufferSize(0x10000), - _tcpNoDelay(true) + _tcpNoDelay(true), + _drop_empty_buffers(false) { } diff --git a/fnet/src/vespa/fnet/config.h b/fnet/src/vespa/fnet/config.h index fd6f9f8557d..6cb1306a114 100644 --- a/fnet/src/vespa/fnet/config.h +++ b/fnet/src/vespa/fnet/config.h @@ -16,6 +16,7 @@ public: uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; bool _tcpNoDelay; + bool _drop_empty_buffers; FNET_Config(); }; diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index a2e6fe25edc..47d6a1e429a 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -347,6 +347,10 @@ done_read: } UpdateTimeOut(); + if (_flags._drop_empty_buffers) { + _socket->drop_empty_buffers(); + _input.Shrink(0); + } uint32_t maxSize = getConfig()._maxInputBufferSize; if (maxSize > 0 && _input.GetBufSize() > maxSize) { @@ -430,6 +434,10 @@ FNET_Connection::Write() } } + if (_flags._drop_empty_buffers) { + _socket->drop_empty_buffers(); + _output.Shrink(0); + } uint32_t maxSize = getConfig()._maxOutputBufferSize; if (maxSize > 0 && _output.GetBufSize() > maxSize) { _output.Shrink(maxSize); @@ -477,7 +485,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _resolve_handler(nullptr), _context(), _state(FNET_CONNECTING), - _flags(), + _flags(owner->owner().getConfig()), _packetLength(0), _packetCode(0), _packetCHID(0), @@ -511,7 +519,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, _resolve_handler(nullptr), _context(context), _state(FNET_CONNECTING), - _flags(), + _flags(owner->owner().getConfig()), _packetLength(0), _packetCode(0), _packetCHID(0), diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index c9c49c5151a..3da9b58f928 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -2,6 +2,7 @@ #pragma once +#include "config.h" #include "iocomponent.h" #include "databuffer.h" #include "context.h" @@ -67,13 +68,14 @@ public: private: struct Flags { - Flags() : + Flags(const FNET_Config &cfg) : _gotheader(false), _inCallback(false), _callbackWait(false), _discarding(false), _framed(false), - _handshake_work_pending(false) + _handshake_work_pending(false), + _drop_empty_buffers(cfg._drop_empty_buffers) { } bool _gotheader; bool _inCallback; @@ -81,6 +83,7 @@ private: bool _discarding; bool _framed; bool _handshake_work_pending; + bool _drop_empty_buffers; }; struct ResolveHandler : public vespalib::AsyncResolver::ResultHandler { FNET_Connection *connection; diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 766eaa3ccaa..6a59f9da66c 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -53,6 +53,11 @@ public: _config._tcpNoDelay = v; return *this; } + TransportConfig &drop_empty_buffers(bool v) { + _config._drop_empty_buffers = v; + return *this; + } + private: FNET_Config _config; vespalib::AsyncResolver::SP _resolver; |