aboutsummaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2021-06-01 09:13:14 +0000
committerHåvard Pettersen <havardpe@oath.com>2021-06-01 11:22:31 +0000
commit5a66d61ea772f59583c9af03087014d74051fecf (patch)
tree77dda4d754178a287eb98c53b0789eccca2abb7b /fnet
parent97bb8e47324450b1cd2ec77a5ee2b03686631bce (diff)
drop empty buffers
Diffstat (limited to 'fnet')
-rw-r--r--fnet/src/tests/connect/connect_test.cpp1
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp43
-rw-r--r--fnet/src/vespa/fnet/config.cpp3
-rw-r--r--fnet/src/vespa/fnet/config.h1
-rw-r--r--fnet/src/vespa/fnet/connection.cpp12
-rw-r--r--fnet/src/vespa/fnet/connection.h7
-rw-r--r--fnet/src/vespa/fnet/transport.h5
7 files changed, 58 insertions, 14 deletions
diff --git a/fnet/src/tests/connect/connect_test.cpp b/fnet/src/tests/connect/connect_test.cpp
index 377035726dd..bb2acdb141e 100644
--- a/fnet/src/tests/connect/connect_test.cpp
+++ b/fnet/src/tests/connect/connect_test.cpp
@@ -65,6 +65,7 @@ struct BlockingCryptoSocket : public CryptoSocket {
ssize_t write(const char *buf, size_t len) override { return socket.write(buf, len); }
ssize_t flush() override { return 0; }
ssize_t half_close() override { return socket.half_close(); }
+ void drop_empty_buffers() override {}
};
struct BlockingCryptoEngine : public CryptoEngine {
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index 58acb928540..b03df359715 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -18,8 +18,8 @@ struct Rpc : FRT_Invokable {
FastOS_ThreadPool thread_pool;
FNET_Transport transport;
FRT_Supervisor orb;
- Rpc(CryptoEngine::SP crypto, size_t num_threads)
- : thread_pool(128_Ki), transport(TransportConfig(num_threads).crypto(std::move(crypto))), orb(&transport) {}
+ Rpc(CryptoEngine::SP crypto, size_t num_threads, bool drop_empty)
+ : thread_pool(128_Ki), transport(TransportConfig(num_threads).crypto(std::move(crypto)).drop_empty_buffers(drop_empty)), orb(&transport) {}
void start() {
ASSERT_TRUE(transport.Start(&thread_pool));
}
@@ -38,7 +38,7 @@ struct Rpc : FRT_Invokable {
struct Server : Rpc {
uint32_t port;
- Server(CryptoEngine::SP crypto, size_t num_threads) : Rpc(std::move(crypto), num_threads), port(listen()) {
+ Server(CryptoEngine::SP crypto, size_t num_threads, bool drop_empty = false) : Rpc(std::move(crypto), num_threads, drop_empty), port(listen()) {
init_rpc();
start();
}
@@ -58,7 +58,7 @@ struct Server : Rpc {
struct Client : Rpc {
uint32_t port;
- Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server) : Rpc(std::move(crypto), num_threads), port(server.port) {
+ Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server, bool drop_empty = false) : Rpc(std::move(crypto), num_threads, drop_empty), port(server.port) {
start();
}
FRT_Target *connect() { return Rpc::connect(port); }
@@ -85,7 +85,16 @@ struct Result {
}
};
-void perform_test(size_t thread_id, Client &client, Result &result) {
+bool verbose = false;
+double budget = 1.5;
+
+void perform_test(size_t thread_id, Client &client, Result &result, bool vital = false) {
+ if (!vital && !verbose) {
+ if (thread_id == 0) {
+ fprintf(stderr, "... skipping non-vital test; run with 'verbose' to enable\n");
+ }
+ return;
+ }
uint64_t seq = 0;
FRT_Target *target = client.connect();
FRT_RPCRequest *req = client.orb.AllocRPCRequest();
@@ -101,7 +110,7 @@ void perform_test(size_t thread_id, Client &client, Result &result) {
};
size_t loop_cnt = 8;
BenchmarkTimer::benchmark(invoke, invoke, 0.5);
- BenchmarkTimer timer(1.5);
+ BenchmarkTimer timer(budget);
while (timer.has_budget()) {
timer.before();
for (size_t i = 0; i < loop_cnt; ++i) {
@@ -139,13 +148,29 @@ TEST_MT_FFF("parallel rpc with 1/1 transport threads and num_cores user threads
TEST_MT_FFF("parallel rpc with 1/1 transport threads and num_cores user threads (tls encryption)",
getNumThreads(), Server(tls_crypto, 1), Client(tls_crypto, 1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+TEST_MT_FFF("parallel rpc with 1/1 transport threads and num_cores user threads (tls encryption + drop empty buffers)",
+ getNumThreads(), Server(tls_crypto, 1, true), Client(tls_crypto, 1, f1, true), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+
TEST_MT_FFF("parallel rpc with 8/8 transport threads and num_cores user threads (no encryption)",
- getNumThreads(), Server(null_crypto, 8), Client(null_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+ getNumThreads(), Server(null_crypto, 8), Client(null_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3, true); }
TEST_MT_FFF("parallel rpc with 8/8 transport threads and num_cores user threads (xor encryption)",
getNumThreads(), Server(xor_crypto, 8), Client(xor_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
TEST_MT_FFF("parallel rpc with 8/8 transport threads and num_cores user threads (tls encryption)",
- getNumThreads(), Server(tls_crypto, 8), Client(tls_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+ getNumThreads(), Server(tls_crypto, 8), Client(tls_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3, true); }
-TEST_MAIN() { TEST_RUN_ALL(); }
+TEST_MT_FFF("parallel rpc with 8/8 transport threads and num_cores user threads (tls encryption + drop empty buffers)",
+ getNumThreads(), Server(tls_crypto, 8, true), Client(tls_crypto, 8, f1, true), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+
+//-----------------------------------------------------------------------------
+
+int main(int argc, char **argv) {
+ TEST_MASTER.init(__FILE__);
+ if ((argc == 2) && (argv[1] == std::string("verbose"))) {
+ verbose = true;
+ budget = 10.0;
+ }
+ TEST_RUN_ALL();
+ return (TEST_MASTER.fini() ? 0 : 1);
+}
diff --git a/fnet/src/vespa/fnet/config.cpp b/fnet/src/vespa/fnet/config.cpp
index 856d0ab4c1c..daf2b57429f 100644
--- a/fnet/src/vespa/fnet/config.cpp
+++ b/fnet/src/vespa/fnet/config.cpp
@@ -7,6 +7,7 @@ FNET_Config::FNET_Config()
_events_before_wakeup(1),
_maxInputBufferSize(0x10000),
_maxOutputBufferSize(0x10000),
- _tcpNoDelay(true)
+ _tcpNoDelay(true),
+ _drop_empty_buffers(false)
{
}
diff --git a/fnet/src/vespa/fnet/config.h b/fnet/src/vespa/fnet/config.h
index fd6f9f8557d..6cb1306a114 100644
--- a/fnet/src/vespa/fnet/config.h
+++ b/fnet/src/vespa/fnet/config.h
@@ -16,6 +16,7 @@ public:
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
bool _tcpNoDelay;
+ bool _drop_empty_buffers;
FNET_Config();
};
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index a2e6fe25edc..47d6a1e429a 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -347,6 +347,10 @@ done_read:
}
UpdateTimeOut();
+ if (_flags._drop_empty_buffers) {
+ _socket->drop_empty_buffers();
+ _input.Shrink(0);
+ }
uint32_t maxSize = getConfig()._maxInputBufferSize;
if (maxSize > 0 && _input.GetBufSize() > maxSize)
{
@@ -430,6 +434,10 @@ FNET_Connection::Write()
}
}
+ if (_flags._drop_empty_buffers) {
+ _socket->drop_empty_buffers();
+ _output.Shrink(0);
+ }
uint32_t maxSize = getConfig()._maxOutputBufferSize;
if (maxSize > 0 && _output.GetBufSize() > maxSize) {
_output.Shrink(maxSize);
@@ -477,7 +485,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_resolve_handler(nullptr),
_context(),
_state(FNET_CONNECTING),
- _flags(),
+ _flags(owner->owner().getConfig()),
_packetLength(0),
_packetCode(0),
_packetCHID(0),
@@ -511,7 +519,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
_resolve_handler(nullptr),
_context(context),
_state(FNET_CONNECTING),
- _flags(),
+ _flags(owner->owner().getConfig()),
_packetLength(0),
_packetCode(0),
_packetCHID(0),
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index c9c49c5151a..3da9b58f928 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -2,6 +2,7 @@
#pragma once
+#include "config.h"
#include "iocomponent.h"
#include "databuffer.h"
#include "context.h"
@@ -67,13 +68,14 @@ public:
private:
struct Flags {
- Flags() :
+ Flags(const FNET_Config &cfg) :
_gotheader(false),
_inCallback(false),
_callbackWait(false),
_discarding(false),
_framed(false),
- _handshake_work_pending(false)
+ _handshake_work_pending(false),
+ _drop_empty_buffers(cfg._drop_empty_buffers)
{ }
bool _gotheader;
bool _inCallback;
@@ -81,6 +83,7 @@ private:
bool _discarding;
bool _framed;
bool _handshake_work_pending;
+ bool _drop_empty_buffers;
};
struct ResolveHandler : public vespalib::AsyncResolver::ResultHandler {
FNET_Connection *connection;
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 766eaa3ccaa..6a59f9da66c 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -53,6 +53,11 @@ public:
_config._tcpNoDelay = v;
return *this;
}
+ TransportConfig &drop_empty_buffers(bool v) {
+ _config._drop_empty_buffers = v;
+ return *this;
+ }
+
private:
FNET_Config _config;
vespalib::AsyncResolver::SP _resolver;