summaryrefslogtreecommitdiffstats
path: root/fnet/src
diff options
context:
space:
mode:
Diffstat (limited to 'fnet/src')
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_client.cpp11
-rw-r--r--fnet/src/examples/frt/rpc/rpc_callback_server.cpp15
-rw-r--r--fnet/src/examples/frt/rpc/rpc_server.cpp6
-rw-r--r--fnet/src/examples/proxy/proxy.cpp1
-rw-r--r--fnet/src/tests/frt/method_pt/method_pt.cpp14
-rw-r--r--fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp39
-rw-r--r--fnet/src/tests/frt/rpc/CMakeLists.txt2
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp2
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp33
-rw-r--r--fnet/src/tests/frt/rpc/my_crypto_engine.hpp10
-rw-r--r--fnet/src/tests/frt/rpc/session.cpp4
-rw-r--r--fnet/src/tests/frt/rpc/sharedblob.cpp2
-rw-r--r--fnet/src/tests/info/info.cpp6
-rw-r--r--fnet/src/vespa/fnet/CMakeLists.txt1
-rw-r--r--fnet/src/vespa/fnet/config.cpp10
-rw-r--r--fnet/src/vespa/fnet/config.h4
-rw-r--r--fnet/src/vespa/fnet/connection.cpp98
-rw-r--r--fnet/src/vespa/fnet/connection.h18
-rw-r--r--fnet/src/vespa/fnet/fnet.h3
-rw-r--r--fnet/src/vespa/fnet/frt/invoker.cpp15
-rw-r--r--fnet/src/vespa/fnet/frt/invoker.h7
-rw-r--r--fnet/src/vespa/fnet/frt/reflection.cpp5
-rw-r--r--fnet/src/vespa/fnet/frt/reflection.h4
-rw-r--r--fnet/src/vespa/fnet/frt/rpcrequest.h2
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp46
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.h2
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp1
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h44
-rw-r--r--fnet/src/vespa/fnet/stats.cpp98
-rw-r--r--fnet/src/vespa/fnet/stats.h103
-rw-r--r--fnet/src/vespa/fnet/transport.cpp8
-rw-r--r--fnet/src/vespa/fnet/transport.h8
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp38
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h73
34 files changed, 139 insertions, 594 deletions
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
index 801de59b515..7c6434e870a 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp
@@ -26,7 +26,7 @@ RPC::Init(FRT_Supervisor *s)
{
FRT_ReflectionBuilder rb(s);
//-------------------------------------------------------------------
- rb.DefineMethod("prod", "", "", true,
+ rb.DefineMethod("prod", "", "",
FRT_METHOD(RPC::Prod), this);
//-------------------------------------------------------------------
}
@@ -45,6 +45,7 @@ MyApp::Main()
printf("usage : rpc_server <connectspec>\n");
return 1;
}
+ bool ok = true;
RPC rpc;
FRT_Supervisor orb;
rpc.Init(&orb);
@@ -63,6 +64,7 @@ MyApp::Main()
printf("[error(%d): %s]\n",
req->GetErrorCode(),
req->GetErrorMessage());
+ ok = false;
}
printf("invokeCnt: %d\n", rpc.invokeCnt);
@@ -76,6 +78,7 @@ MyApp::Main()
printf("[error(%d): %s]\n",
req->GetErrorCode(),
req->GetErrorMessage());
+ ok = false;
}
printf("invokeCnt: %d\n", rpc.invokeCnt);
@@ -89,14 +92,18 @@ MyApp::Main()
printf("[error(%d): %s]\n",
req->GetErrorCode(),
req->GetErrorMessage());
+ ok = false;
}
printf("invokeCnt: %d\n", rpc.invokeCnt);
+ if (rpc.invokeCnt != 3) {
+ ok = false;
+ }
req->SubRef();
target->SubRef();
orb.ShutDown(true);
- return 0;
+ return ok ? 0 : 1;
}
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
index 33207282bcc..ac7b34ebda0 100644
--- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp
@@ -2,6 +2,7 @@
#include <vespa/fnet/frt/frt.h>
#include <vespa/fastos/app.h>
+#include <thread>
#include <vespa/log/log.h>
LOG_SETUP("rpc_callback_server");
@@ -12,9 +13,7 @@ struct RPC : public FRT_Invokable
void Init(FRT_Supervisor *s);
};
-void
-RPC::CallBack(FRT_RPCRequest *req)
-{
+void do_callback(FRT_RPCRequest *req) {
FNET_Connection *conn = req->GetConnection();
FRT_RPCRequest *cb = new FRT_RPCRequest();
cb->SetMethodName(req->GetParams()->GetValue(0)._string._str);
@@ -25,6 +24,14 @@ RPC::CallBack(FRT_RPCRequest *req)
cb->GetErrorMessage());
}
cb->SubRef();
+ req->Return();
+}
+
+void
+RPC::CallBack(FRT_RPCRequest *req)
+{
+ req->Detach();
+ std::thread(do_callback, req).detach();
}
void
@@ -32,7 +39,7 @@ RPC::Init(FRT_Supervisor *s)
{
FRT_ReflectionBuilder rb(s);
//-------------------------------------------------------------------
- rb.DefineMethod("callBack", "s", "", false,
+ rb.DefineMethod("callBack", "s", "",
FRT_METHOD(RPC::CallBack), this);
//-------------------------------------------------------------------
}
diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp
index 8947663216e..03d618133c9 100644
--- a/fnet/src/examples/frt/rpc/rpc_server.cpp
+++ b/fnet/src/examples/frt/rpc/rpc_server.cpp
@@ -28,21 +28,21 @@ RPCServer::InitRPC(FRT_Supervisor *s)
{
FRT_ReflectionBuilder rb(s);
//-------------------------------------------------------------------
- rb.DefineMethod("concat", "ss", "s", true,
+ rb.DefineMethod("concat", "ss", "s",
FRT_METHOD(RPCServer::RPC_concat), this);
rb.MethodDesc("Concatenate two strings");
rb.ParamDesc("string1", "a string");
rb.ParamDesc("string2", "another string");
rb.ReturnDesc("ret", "the concatenation of string1 and string2");
//-------------------------------------------------------------------
- rb.DefineMethod("addFloat", "ff", "f", true,
+ rb.DefineMethod("addFloat", "ff", "f",
FRT_METHOD(RPCServer::RPC_addFloat), this);
rb.MethodDesc("Add two floats");
rb.ParamDesc("float1", "a float");
rb.ParamDesc("float2", "another float");
rb.ReturnDesc("ret", "float1 + float2");
//-------------------------------------------------------------------
- rb.DefineMethod("addDouble", "dd", "d", true,
+ rb.DefineMethod("addDouble", "dd", "d",
FRT_METHOD(RPCServer::RPC_addDouble), this);
rb.MethodDesc("Add two doubles");
rb.ParamDesc("double1", "a double");
diff --git a/fnet/src/examples/proxy/proxy.cpp b/fnet/src/examples/proxy/proxy.cpp
index 653b445581f..a01a16ead9c 100644
--- a/fnet/src/examples/proxy/proxy.cpp
+++ b/fnet/src/examples/proxy/proxy.cpp
@@ -227,7 +227,6 @@ Proxy::Main()
if (listener != nullptr)
listener->SubRef();
- _transport.SetLogStats(true);
FNET_SignalShutDown ssd(_transport);
_transport.Main();
return 0;
diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp
index db5905d6871..5417fddceeb 100644
--- a/fnet/src/tests/frt/method_pt/method_pt.cpp
+++ b/fnet/src/tests/frt/method_pt/method_pt.cpp
@@ -207,35 +207,35 @@ void initTest() {
//-------------------------------------------------------------------
- rb.DefineMethod("simpleMethod", "", "", true,
+ rb.DefineMethod("simpleMethod", "", "",
FRT_METHOD(SimpleHandler::RPC_Method),
_simpleHandler);
//-------------------------------------------------------------------
- rb.DefineMethod("mediumMethod1", "", "", true,
+ rb.DefineMethod("mediumMethod1", "", "",
FRT_METHOD(MediumHandler1::RPC_Method),
_mediumHandler1);
- rb.DefineMethod("mediumMethod2", "", "", true,
+ rb.DefineMethod("mediumMethod2", "", "",
FRT_METHOD(MediumHandler2::RPC_Method),
_mediumHandler2);
- rb.DefineMethod("mediumMethod3", "", "", true,
+ rb.DefineMethod("mediumMethod3", "", "",
FRT_METHOD(MediumHandler3::RPC_Method),
_mediumHandler3);
//-------------------------------------------------------------------
- rb.DefineMethod("complexMethod1", "", "", true,
+ rb.DefineMethod("complexMethod1", "", "",
FRT_METHOD(ComplexHandler1::RPC_Method),
_complexHandler1);
- rb.DefineMethod("complexMethod2", "", "", true,
+ rb.DefineMethod("complexMethod2", "", "",
FRT_METHOD(ComplexHandler2::RPC_Method),
_complexHandler2);
- rb.DefineMethod("complexMethod3", "", "", true,
+ rb.DefineMethod("complexMethod3", "", "",
FRT_METHOD(ComplexHandler3::RPC_Method),
_complexHandler3);
diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
index 4f8b4b82743..31aec84afd5 100644
--- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
+++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp
@@ -3,16 +3,19 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fnet/frt/frt.h>
#include <vespa/vespalib/util/benchmark_timer.h>
+#include <vespa/vespalib/net/crypto_engine.h>
+#include <vespa/vespalib/net/tls/tls_crypto_engine.h>
+#include <vespa/vespalib/test/make_tls_options_for_testing.h>
#include <thread>
-using vespalib::BenchmarkTimer;
+using namespace vespalib;
struct Rpc : FRT_Invokable {
FastOS_ThreadPool thread_pool;
FNET_Transport transport;
FRT_Supervisor orb;
- Rpc(size_t num_threads)
- : thread_pool(128 * 1024), transport(num_threads), orb(&transport, &thread_pool) {}
+ Rpc(CryptoEngine::SP crypto, size_t num_threads)
+ : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport, &thread_pool) {}
void start() {
ASSERT_TRUE(transport.Start(&thread_pool));
}
@@ -31,13 +34,13 @@ struct Rpc : FRT_Invokable {
struct Server : Rpc {
uint32_t port;
- Server(size_t num_threads) : Rpc(num_threads), port(listen()) {
+ Server(CryptoEngine::SP crypto, size_t num_threads) : Rpc(crypto, num_threads), port(listen()) {
init_rpc();
start();
}
void init_rpc() {
FRT_ReflectionBuilder rb(&orb);
- rb.DefineMethod("inc", "l", "l", true, FRT_METHOD(Server::rpc_inc), this);
+ rb.DefineMethod("inc", "l", "l", FRT_METHOD(Server::rpc_inc), this);
rb.MethodDesc("increment a 64-bit integer");
rb.ParamDesc("in", "an integer (64 bit)");
rb.ReturnDesc("out", "in + 1 (64 bit)");
@@ -51,7 +54,7 @@ struct Server : Rpc {
struct Client : Rpc {
uint32_t port;
- Client(size_t num_threads, const Server &server) : Rpc(num_threads), port(server.port) {
+ Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server) : Rpc(crypto, num_threads), port(server.port) {
start();
}
FRT_Target *connect() { return Rpc::connect(port); }
@@ -114,10 +117,26 @@ void perform_test(size_t thread_id, Client &client, Result &result) {
}
}
-TEST_MT_FFF("parallel rpc with 1/1 transport threads and 128 user threads",
- 128, Server(1), Client(1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+CryptoEngine::SP null_crypto = std::make_shared<NullCryptoEngine>();
+CryptoEngine::SP xor_crypto = std::make_shared<XorCryptoEngine>();
+CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing());
-TEST_MT_FFF("parallel rpc with 8/8 transport threads and 128 user threads",
- 128, Server(8), Client(8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+TEST_MT_FFF("parallel rpc with 1/1 transport threads and 128 user threads (no encryption)",
+ 128, Server(null_crypto, 1), Client(null_crypto, 1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+
+TEST_MT_FFF("parallel rpc with 1/1 transport threads and 128 user threads (xor encryption)",
+ 128, Server(xor_crypto, 1), Client(xor_crypto, 1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+
+TEST_MT_FFF("parallel rpc with 1/1 transport threads and 128 user threads (tls encryption)",
+ 128, Server(tls_crypto, 1), Client(tls_crypto, 1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+
+TEST_MT_FFF("parallel rpc with 8/8 transport threads and 128 user threads (no encryption)",
+ 128, Server(null_crypto, 8), Client(null_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+
+TEST_MT_FFF("parallel rpc with 8/8 transport threads and 128 user threads (xor encryption)",
+ 128, Server(xor_crypto, 8), Client(xor_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
+
+TEST_MT_FFF("parallel rpc with 8/8 transport threads and 128 user threads (tls encryption)",
+ 128, Server(tls_crypto, 8), Client(tls_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); }
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/tests/frt/rpc/CMakeLists.txt b/fnet/src/tests/frt/rpc/CMakeLists.txt
index f935590ee77..2bacd37686a 100644
--- a/fnet/src/tests/frt/rpc/CMakeLists.txt
+++ b/fnet/src/tests/frt/rpc/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_executable(fnet_invoke_test_app TEST
)
vespa_add_test(NAME fnet_invoke_test_app COMMAND fnet_invoke_test_app)
vespa_add_test(NAME fnet_invoke_test_app_xor COMMAND fnet_invoke_test_app ENVIRONMENT "CRYPTOENGINE=xor")
+vespa_add_test(NAME fnet_invoke_test_app_tls COMMAND fnet_invoke_test_app ENVIRONMENT "CRYPTOENGINE=tls")
vespa_add_executable(fnet_detach_return_invoke_test_app TEST
SOURCES
detach_return_invoke.cpp
@@ -22,6 +23,7 @@ vespa_add_executable(fnet_session_test_app TEST
)
vespa_add_test(NAME fnet_session_test_app COMMAND fnet_session_test_app)
vespa_add_test(NAME fnet_session_test_app_xor COMMAND fnet_session_test_app ENVIRONMENT "CRYPTOENGINE=xor")
+vespa_add_test(NAME fnet_session_test_app_tls COMMAND fnet_session_test_app ENVIRONMENT "CRYPTOENGINE=tls")
vespa_add_executable(fnet_sharedblob_test_app TEST
SOURCES
sharedblob.cpp
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index 54a891261c2..ab21c62bb68 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -20,7 +20,7 @@ struct Server : public FRT_Invokable
Server(FRT_Supervisor &s, Receptor &r) : orb(s), receptor(r) {
FRT_ReflectionBuilder rb(&s);
- rb.DefineMethod("hook", "", "", true,
+ rb.DefineMethod("hook", "", "",
FRT_METHOD(Server::rpc_hook), this);
}
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index 787adb227f9..e3bd662214f 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -124,7 +124,7 @@ public:
assert(_echo_stash != nullptr && _echo_args != nullptr);
FRT_ReflectionBuilder rb(supervisor);
- rb.DefineMethod("echo", "*", "*", true,
+ rb.DefineMethod("echo", "*", "*",
FRT_METHOD(EchoTest::RPC_Echo), this);
FRT_Values *args = _echo_args;
@@ -225,17 +225,15 @@ public:
{
FRT_ReflectionBuilder rb(supervisor);
- rb.DefineMethod("inc", "i", "i", true,
+ rb.DefineMethod("inc", "i", "i",
FRT_METHOD(TestRPC::RPC_Inc), this);
- rb.DefineMethod("setValue", "i", "", true,
+ rb.DefineMethod("setValue", "i", "",
FRT_METHOD(TestRPC::RPC_SetValue), this);
- rb.DefineMethod("incValue", "", "", true,
+ rb.DefineMethod("incValue", "", "",
FRT_METHOD(TestRPC::RPC_IncValue), this);
- rb.DefineMethod("getValue", "", "i", true,
+ rb.DefineMethod("getValue", "", "i",
FRT_METHOD(TestRPC::RPC_GetValue), this);
- rb.DefineMethod("testFast", "iiibb", "i", true,
- FRT_METHOD(TestRPC::RPC_Test), this);
- rb.DefineMethod("testSlow", "iiibb", "i", false,
+ rb.DefineMethod("testFast", "iiibb", "i",
FRT_METHOD(TestRPC::RPC_Test), this);
}
@@ -364,7 +362,6 @@ const char phase_names[PHASE_ZZZ][32] =
enum {
TIMING_NULL = 0,
TIMING_INSTANT,
- TIMING_NON_INSTANT,
TIMING_ZZZ
};
@@ -372,7 +369,6 @@ const char timing_names[TIMING_ZZZ][32] =
{
"nullptr",
"INSTANT",
- "NON-INSTANT"
};
enum {
@@ -451,17 +447,10 @@ struct State {
void PrepareTestMethod()
{
NewReq();
- bool instant = (_timing == TIMING_INSTANT);
- if (_timing != TIMING_INSTANT &&
- _timing != TIMING_NON_INSTANT)
- {
+ if (_timing != TIMING_INSTANT) {
ASSERT_TRUE(false); // consult your dealer...
}
- if (instant) {
- _req->SetMethodName("testFast");
- } else {
- _req->SetMethodName("testSlow");
- }
+ _req->SetMethodName("testFast");
}
void SetTestParams(uint32_t value, uint32_t delay,
@@ -928,9 +917,9 @@ TEST_F("invoke test", State()) {
EXPECT_TRUE(_phase_simple_cnt == 1);
EXPECT_TRUE(_phase_void_cnt == 1);
EXPECT_TRUE(_phase_speed_cnt == 1);
- EXPECT_TRUE(_phase_advanced_cnt == 4);
- EXPECT_TRUE(_phase_error_cnt == 4);
- EXPECT_TRUE(_phase_abort_cnt == 4);
+ EXPECT_TRUE(_phase_advanced_cnt == 2);
+ EXPECT_TRUE(_phase_error_cnt == 2);
+ EXPECT_TRUE(_phase_abort_cnt == 2);
EXPECT_TRUE(_phase_echo_cnt == 1);
}
diff --git a/fnet/src/tests/frt/rpc/my_crypto_engine.hpp b/fnet/src/tests/frt/rpc/my_crypto_engine.hpp
index 6f573e5695a..6cd8d47e917 100644
--- a/fnet/src/tests/frt/rpc/my_crypto_engine.hpp
+++ b/fnet/src/tests/frt/rpc/my_crypto_engine.hpp
@@ -1,15 +1,21 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/net/tls/tls_crypto_engine.h>
+#include <vespa/vespalib/test/make_tls_options_for_testing.h>
+
vespalib::CryptoEngine::SP my_crypto_engine() {
const char *env_str = getenv("CRYPTOENGINE");
if (!env_str) {
- fprintf(stderr, "crypto engine: default\n");
- return vespalib::CryptoEngine::get_default();
+ fprintf(stderr, "crypto engine: null\n");
+ return std::make_shared<vespalib::NullCryptoEngine>();
}
std::string engine(env_str);
if (engine == "xor") {
fprintf(stderr, "crypto engine: xor\n");
return std::make_shared<vespalib::XorCryptoEngine>();
+ } else if (engine == "tls") {
+ fprintf(stderr, "crypto engine: tls\n");
+ return std::make_shared<vespalib::TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing());
}
TEST_FATAL(("invalid crypto engine: " + engine).c_str());
abort();
diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp
index b84db9b4e88..93f14647e21 100644
--- a/fnet/src/tests/frt/rpc/session.cpp
+++ b/fnet/src/tests/frt/rpc/session.cpp
@@ -77,9 +77,9 @@ struct RPC : public FRT_Invokable
void Init(FRT_Supervisor *s)
{
FRT_ReflectionBuilder rb(s);
- rb.DefineMethod("getValue", "", "i", true,
+ rb.DefineMethod("getValue", "", "i",
FRT_METHOD(RPC::GetValue), this);
- rb.DefineMethod("setValue", "i", "", true,
+ rb.DefineMethod("setValue", "i", "",
FRT_METHOD(RPC::SetValue), this);
s->SetSessionInitHook(FRT_METHOD(RPC::InitSession), this);
s->SetSessionFiniHook(FRT_METHOD(RPC::FiniSession), this);
diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp
index 10eaad9c013..a48ecbb1da7 100644
--- a/fnet/src/tests/frt/rpc/sharedblob.cpp
+++ b/fnet/src/tests/frt/rpc/sharedblob.cpp
@@ -176,7 +176,7 @@ TEST("testImplicitShared") {
ServerSampler serverSampler(dataSet, req);
{
FRT_ReflectionBuilder rb(&orb);
- rb.DefineMethod("test", "*", "*", true,
+ rb.DefineMethod("test", "*", "*",
FRT_METHOD(ServerSampler::RPC_test), &serverSampler);
}
orb.Listen(0);
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index f22c1402437..f76e66c2af6 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -24,7 +24,7 @@ struct RPC : public FRT_Invokable
{
FRT_ReflectionBuilder rb(s);
//-------------------------------------------------------------------
- rb.DefineMethod("getInfo", "", "sssii", true,
+ rb.DefineMethod("getInfo", "", "sssii",
FRT_METHOD(RPC::GetInfo), this);
// FastOS version
// FNET version
@@ -70,10 +70,10 @@ TEST("info") {
TEST("size of important objects")
{
- EXPECT_EQUAL(176u, sizeof(FNET_IOComponent));
+ EXPECT_EQUAL(168u, sizeof(FNET_IOComponent));
EXPECT_EQUAL(32u, sizeof(FNET_Channel));
EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock));
- EXPECT_EQUAL(480u, sizeof(FNET_Connection));
+ EXPECT_EQUAL(472u, sizeof(FNET_Connection));
EXPECT_EQUAL(48u, sizeof(std::condition_variable));
EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer));
EXPECT_EQUAL(24u, sizeof(FastOS_Time));
diff --git a/fnet/src/vespa/fnet/CMakeLists.txt b/fnet/src/vespa/fnet/CMakeLists.txt
index 20badc5c489..4b9d818e5ed 100644
--- a/fnet/src/vespa/fnet/CMakeLists.txt
+++ b/fnet/src/vespa/fnet/CMakeLists.txt
@@ -17,7 +17,6 @@ vespa_add_library(fnet
scheduler.cpp
signalshutdown.cpp
simplepacketstreamer.cpp
- stats.cpp
task.cpp
transport.cpp
transport_thread.cpp
diff --git a/fnet/src/vespa/fnet/config.cpp b/fnet/src/vespa/fnet/config.cpp
index feed7f2d241..a546d38f78b 100644
--- a/fnet/src/vespa/fnet/config.cpp
+++ b/fnet/src/vespa/fnet/config.cpp
@@ -3,11 +3,9 @@
#include "config.h"
FNET_Config::FNET_Config()
- : _minEventTimeOut(0),
- _pingInterval(0),
- _iocTimeOut(0),
+ : _iocTimeOut(0),
_maxInputBufferSize(0x10000),
_maxOutputBufferSize(0x10000),
- _tcpNoDelay(true),
- _logStats(false)
-{ }
+ _tcpNoDelay(true)
+{
+}
diff --git a/fnet/src/vespa/fnet/config.h b/fnet/src/vespa/fnet/config.h
index e94cf0f6105..3f34c1511b6 100644
--- a/fnet/src/vespa/fnet/config.h
+++ b/fnet/src/vespa/fnet/config.h
@@ -11,14 +11,10 @@
class FNET_Config
{
public:
- uint32_t _minEventTimeOut;
- uint32_t _pingInterval;
uint32_t _iocTimeOut;
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
bool _tcpNoDelay;
- bool _logStats;
FNET_Config();
};
-
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index b9db8a46a4f..f2864d1dd58 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -110,13 +110,6 @@ FNET_Connection::SetState(State state)
}
if (oldstate < FNET_CLOSING && state >= FNET_CLOSING) {
- if (_flags._writeLock) {
- _flags._discarding = true;
- while (_flags._writeLock)
- _ioc_cond.wait(guard);
- _flags._discarding = false;
- }
-
while (!_queue.IsEmpty_NoLock() || !_myQueue.IsEmpty_NoLock()) {
_flags._discarding = true;
_queue.FlushPackets_NoLock(&_myQueue);
@@ -233,14 +226,13 @@ FNET_Connection::handshake()
EnableReadEvent(true);
EnableWriteEvent(writePendingAfterConnect());
size_t chunk_size = std::max(size_t(FNET_READ_SIZE), _socket->min_read_buffer_size());
- uint32_t ignore_stats = 0;
ssize_t res = 0;
do { // drain input pipeline
_input.EnsureFree(chunk_size);
res = _socket->drain(_input.GetFree(), _input.GetFreeLen());
if (res > 0) {
_input.FreeToData((uint32_t)res);
- broken = !handle_packets(ignore_stats);
+ broken = !handle_packets();
_input.resetIfEmpty();
}
} while ((res > 0) && !broken); }
@@ -258,7 +250,7 @@ FNET_Connection::handshake()
}
bool
-FNET_Connection::handle_packets(uint32_t &read_packets)
+FNET_Connection::handle_packets()
{
bool broken = false;
for (bool done = false; !done;) { // handle each complete packet in the buffer.
@@ -268,7 +260,6 @@ FNET_Connection::handle_packets(uint32_t &read_packets)
&broken);
}
if (_flags._gotheader && (_input.GetDataLen() >= _packetLength)) {
- read_packets++;
HandlePacket(_packetLength, _packetCode, _packetCHID);
_flags._gotheader = false; // reset header flag.
} else {
@@ -282,26 +273,26 @@ bool
FNET_Connection::Read()
{
size_t chunk_size = std::max(size_t(FNET_READ_SIZE), _socket->min_read_buffer_size());
- uint32_t readData = 0; // total data read
- uint32_t readPackets = 0; // total packets read
int readCnt = 0; // read count
bool broken = false; // is this conn broken ?
+ int my_errno = 0; // sample and preserve errno
ssize_t res; // single read result
_input.EnsureFree(chunk_size);
res = _socket->read(_input.GetFree(), _input.GetFreeLen());
+ my_errno = errno;
readCnt++;
while (res > 0) {
_input.FreeToData((uint32_t)res);
- readData += (uint32_t)res;
- broken = !handle_packets(readPackets);
+ broken = !handle_packets();
_input.resetIfEmpty();
if (broken || (_input.GetFreeLen() > 0) || (readCnt >= FNET_READ_REDO)) {
goto done_read;
}
_input.EnsureFree(chunk_size);
res = _socket->read(_input.GetFree(), _input.GetFreeLen());
+ my_errno = errno;
readCnt++;
}
@@ -310,28 +301,24 @@ done_read:
while ((res > 0) && !broken) { // drain input pipeline
_input.EnsureFree(chunk_size);
res = _socket->drain(_input.GetFree(), _input.GetFreeLen());
+ my_errno = errno;
readCnt++;
if (res > 0) {
_input.FreeToData((uint32_t)res);
- readData += (uint32_t)res;
- broken = !handle_packets(readPackets);
+ broken = !handle_packets();
_input.resetIfEmpty();
} else if (res == 0) { // fully drained -> EWOULDBLOCK
- errno = EWOULDBLOCK;
+ my_errno = EWOULDBLOCK;
res = -1;
}
}
- if (readData > 0) {
- UpdateTimeOut();
- CountDataRead(readData);
- CountPacketRead(readPackets);
- uint32_t maxSize = GetConfig()->_maxInputBufferSize;
- if (maxSize > 0 && _input.GetBufSize() > maxSize)
- {
- if (!_flags._gotheader || _packetLength < maxSize) {
- _input.Shrink(maxSize);
- }
+ UpdateTimeOut();
+ uint32_t maxSize = GetConfig()->_maxInputBufferSize;
+ if (maxSize > 0 && _input.GetBufSize() > maxSize)
+ {
+ if (!_flags._gotheader || _packetLength < maxSize) {
+ _input.Shrink(maxSize);
}
}
@@ -339,9 +326,9 @@ done_read:
if (res == 0) {
broken = true; // handle EOF
} else { // res < 0
- broken = ((errno != EWOULDBLOCK) && (errno != EAGAIN));
- if (broken && (errno != ECONNRESET)) {
- LOG(debug, "Connection(%s): read error: %d", GetSpec(), errno);
+ broken = ((my_errno != EWOULDBLOCK) && (my_errno != EAGAIN));
+ if (broken && (my_errno != ECONNRESET)) {
+ LOG(debug, "Connection(%s): read error: %d", GetSpec(), my_errno);
}
}
}
@@ -354,10 +341,9 @@ bool
FNET_Connection::Write()
{
uint32_t my_write_work = 0;
- uint32_t writtenData = 0; // total data written
- uint32_t writtenPackets = 0; // total packets written
int writeCnt = 0; // write count
bool broken = false; // is this conn broken ?
+ int my_errno = 0; // sample and preserve errno
ssize_t res; // single write result
FNET_Packet *packet;
@@ -374,7 +360,6 @@ FNET_Connection::Write()
packet = _myQueue.DequeuePacket_NoLock(&context);
if (packet->IsRegularPacket()) { // ignore non-regular packets
_streamer->Encode(packet, context._value.INT, &_output);
- writtenPackets++;
}
packet->Free();
}
@@ -387,10 +372,10 @@ FNET_Connection::Write()
// write data
res = _socket->write(_output.GetData(), _output.GetDataLen());
+ my_errno = errno;
writeCnt++;
if (res > 0) {
_output.DataToDead((uint32_t)res);
- writtenData += (uint32_t)res;
_output.resetIfEmpty();
}
} while (res > 0 &&
@@ -404,26 +389,26 @@ FNET_Connection::Write()
if (res >= 0) { // flush output pipeline
res = _socket->flush();
+ my_errno = errno;
while (res > 0) {
res = _socket->flush();
+ my_errno = errno;
}
}
- if (writtenData > 0) {
- uint32_t maxSize = GetConfig()->_maxOutputBufferSize;
- if (maxSize > 0 && _output.GetBufSize() > maxSize) {
- _output.Shrink(maxSize);
- }
+ uint32_t maxSize = GetConfig()->_maxOutputBufferSize;
+ if (maxSize > 0 && _output.GetBufSize() > maxSize) {
+ _output.Shrink(maxSize);
}
if (res < 0) {
- if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) {
+ if ((my_errno == EWOULDBLOCK) || (my_errno == EAGAIN)) {
++my_write_work; // incomplete write/flush
} else {
broken = true;
}
- if (broken && (errno != ECONNRESET)) {
- LOG(debug, "Connection(%s): write error: %d", GetSpec(), errno);
+ if (broken && (my_errno != ECONNRESET)) {
+ LOG(debug, "Connection(%s): write error: %d", GetSpec(), my_errno);
}
}
@@ -431,17 +416,9 @@ FNET_Connection::Write()
_writeWork = _queue.GetPacketCnt_NoLock()
+ _myQueue.GetPacketCnt_NoLock()
+ my_write_work;
- _flags._writeLock = false;
- if (_flags._discarding) {
- _ioc_cond.notify_all();
- }
bool writePending = (_writeWork > 0);
guard.unlock();
- if (writtenData > 0) {
- CountDataWrite(writtenData);
- CountPacketWrite(writtenPackets);
- }
if (!writePending)
EnableWriteEvent(false);
@@ -528,7 +505,6 @@ FNET_Connection::~FNET_Connection()
delete _adminChannel;
}
assert(_cleanup == nullptr);
- assert(!_flags._writeLock);
}
@@ -682,9 +658,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
writeWork = _writeWork;
_writeWork++;
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
- if (writeWork == 0 && !_flags._writeLock &&
- _state == FNET_CONNECTED)
- {
+ if ((writeWork == 0) && (_state == FNET_CONNECTED)) {
AddRef_NoLock();
guard.unlock();
Owner()->EnableWrite(this, /* needRef = */ false);
@@ -693,14 +667,6 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
}
-uint32_t
-FNET_Connection::GetQueueLen()
-{
- std::lock_guard<std::mutex> guard(_ioc_lock);
- return _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock();
-}
-
-
void
FNET_Connection::Sync()
{
@@ -774,12 +740,6 @@ FNET_Connection::HandleWriteEvent()
case FNET_CONNECTED:
{
std::unique_lock<std::mutex> guard(_ioc_lock);
- if (_flags._writeLock) {
- guard.unlock();
- EnableWriteEvent(false);
- return true;
- }
- _flags._writeLock = true;
_queue.FlushPackets_NoLock(&_myQueue);
}
broken = !Write();
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 8e5e6280fab..8e275d68b18 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -68,13 +68,11 @@ private:
struct Flags {
Flags() :
_gotheader(false),
- _writeLock(false),
_inCallback(false),
_callbackWait(false),
_discarding(false)
{ }
bool _gotheader;
- bool _writeLock;
bool _inCallback;
bool _callbackWait;
bool _discarding;
@@ -212,9 +210,8 @@ private:
* for each one.
*
* @return false if socket is broken.
- * @param read_packets count read packets here
**/
- bool handle_packets(uint32_t &read_packets);
+ bool handle_packets();
/**
* Read incoming data from socket.
@@ -450,19 +447,6 @@ public:
/**
- * Obtain the number of packets located in the output queue for this
- * connection. Note that this number is volatile and should only be
- * used as an estimate. Also note that since a queue latching
- * strategy is used, this method requires a mutex lock/unlock and is
- * therefore not as cheap as may be expected.
- *
- * @return number of packets currently located in the output queue
- * for this connection.
- **/
- uint32_t GetQueueLen();
-
-
- /**
* Sync with this connection. When this method is invoked it will
* block until all packets currently posted on this connection is
* encoded into the output buffer. Also, the amount of data in the
diff --git a/fnet/src/vespa/fnet/fnet.h b/fnet/src/vespa/fnet/fnet.h
index 5a3a8b28942..c7570e025ec 100644
--- a/fnet/src/vespa/fnet/fnet.h
+++ b/fnet/src/vespa/fnet/fnet.h
@@ -32,8 +32,6 @@ class FNET_Packet;
class FNET_PacketQueue;
class FNET_Scheduler;
class FNET_SimplePacketStreamer;
-class FNET_StatCounters;
-class FNET_Stats;
class FNET_Task;
class FNET_Transport;
class FNET_TransportThread;
@@ -52,7 +50,6 @@ class FNET_TransportThread;
#include "task.h"
#include "scheduler.h"
#include "config.h"
-#include "stats.h"
#include "databuffer.h"
#include "packet.h"
#include "dummypacket.h"
diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp
index f2dc331c707..b174c3a710e 100644
--- a/fnet/src/vespa/fnet/frt/invoker.cpp
+++ b/fnet/src/vespa/fnet/frt/invoker.cpp
@@ -64,18 +64,14 @@ FRT_RPCInvoker::FRT_RPCInvoker(FRT_Supervisor *supervisor,
req->SetReturnHandler(this);
}
-bool FRT_RPCInvoker::IsInstant() {
- return _method->IsInstant();
-}
-
-bool FRT_RPCInvoker::Invoke(bool freeChannel)
+bool FRT_RPCInvoker::Invoke()
{
bool detached = false;
_req->SetDetachedPT(&detached);
(_method->GetHandler()->*_method->GetMethod())(_req);
if (detached)
return false;
- HandleDone(freeChannel);
+ HandleDone(false);
return true;
}
@@ -120,13 +116,6 @@ FRT_RPCInvoker::GetConnection()
return _req->GetContext()._value.CHANNEL->GetConnection();
}
-
-void
-FRT_RPCInvoker::Run(FastOS_ThreadInterface *, void *)
-{
- Invoke(true);
-}
-
//-----------------------------------------------------------------------------
void FRT_HookInvoker::Invoke()
diff --git a/fnet/src/vespa/fnet/frt/invoker.h b/fnet/src/vespa/fnet/frt/invoker.h
index 15d74017200..64adf66688e 100644
--- a/fnet/src/vespa/fnet/frt/invoker.h
+++ b/fnet/src/vespa/fnet/frt/invoker.h
@@ -59,8 +59,7 @@ public:
//-----------------------------------------------------------------------------
-class FRT_RPCInvoker : public FastOS_Runnable,
- public FRT_IReturnHandler
+class FRT_RPCInvoker : public FRT_IReturnHandler
{
private:
FRT_RPCRequest *_req;
@@ -76,15 +75,13 @@ public:
bool noReply);
void ForceMethod(FRT_Method *method) { _method = method; }
- bool IsInstant();
FRT_RPCRequest *GetRequest() { return _req; }
void HandleDone(bool freeChannel);
- bool Invoke(bool freeChannel);
+ bool Invoke();
void HandleReturn() override;
FNET_Connection *GetConnection() override;
- void Run(FastOS_ThreadInterface *, void *) override;
};
//-----------------------------------------------------------------------------
diff --git a/fnet/src/vespa/fnet/frt/reflection.cpp b/fnet/src/vespa/fnet/frt/reflection.cpp
index 4285c512ebf..305294f4a3c 100644
--- a/fnet/src/vespa/fnet/frt/reflection.cpp
+++ b/fnet/src/vespa/fnet/frt/reflection.cpp
@@ -6,13 +6,12 @@
#include "supervisor.h"
FRT_Method::FRT_Method(const char * name, const char * paramSpec, const char * returnSpec,
- bool instant, FRT_METHOD_PT method, FRT_Invokable * handler)
+ FRT_METHOD_PT method, FRT_Invokable * handler)
: _hashNext(nullptr),
_listNext(nullptr),
_name(strdup(name)),
_paramSpec(strdup(paramSpec)),
_returnSpec(strdup(returnSpec)),
- _instant(instant),
_method(method),
_handler(handler),
_docLen(0),
@@ -171,7 +170,6 @@ void
FRT_ReflectionBuilder::DefineMethod(const char *name,
const char *paramSpec,
const char *returnSpec,
- bool instant,
FRT_METHOD_PT method,
FRT_Invokable *handler)
{
@@ -182,7 +180,6 @@ FRT_ReflectionBuilder::DefineMethod(const char *name,
_method = new FRT_Method(name,
paramSpec,
returnSpec,
- instant,
method,
handler);
_lookup->AddMethod(_method);
diff --git a/fnet/src/vespa/fnet/frt/reflection.h b/fnet/src/vespa/fnet/frt/reflection.h
index 466e58413e9..5189cf81d0a 100644
--- a/fnet/src/vespa/fnet/frt/reflection.h
+++ b/fnet/src/vespa/fnet/frt/reflection.h
@@ -19,7 +19,6 @@ private:
char *_name; // method name
char *_paramSpec; // method parameter spec
char *_returnSpec; // method return spec
- bool _instant; // method is instant ?
FRT_METHOD_PT _method; // method pointer
FRT_Invokable *_handler; // method handler
uint32_t _docLen; // method documentation length
@@ -32,7 +31,6 @@ public:
FRT_Method(const char *name,
const char *paramSpec,
const char *returnSpec,
- bool instant,
FRT_METHOD_PT method,
FRT_Invokable *handler);
@@ -42,7 +40,6 @@ public:
const char *GetName() { return _name; }
const char *GetParamSpec() { return _paramSpec; }
const char *GetReturnSpec() { return _returnSpec; }
- bool IsInstant() { return _instant; }
FRT_METHOD_PT GetMethod() { return _method; }
FRT_Invokable *GetHandler() { return _handler; }
void SetDocumentation(FRT_Values *values);
@@ -121,7 +118,6 @@ public:
void DefineMethod(const char *name,
const char *paramSpec,
const char *returnSpec,
- bool instant,
FRT_METHOD_PT method,
FRT_Invokable *handler);
void MethodDesc(const char *desc);
diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.h b/fnet/src/vespa/fnet/frt/rpcrequest.h
index a10653ce2f6..cc871e7ac0c 100644
--- a/fnet/src/vespa/fnet/frt/rpcrequest.h
+++ b/fnet/src/vespa/fnet/frt/rpcrequest.h
@@ -133,7 +133,7 @@ public:
FNET_Packet *CreateReplyPacket();
void SetDetachedPT(bool *detachedPT) { _detachedPT = detachedPT; }
- void Detach() { *_detachedPT = true; }
+ FRT_RPCRequest *Detach() { *_detachedPT = true; return this; }
void SetAbortHandler(FRT_IAbortHandler *handler) { _abortHandler = handler; }
void SetReturnHandler(FRT_IReturnHandler *handler) { _returnHandler = handler; }
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index 927e2e84b94..e509223c005 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -91,22 +91,6 @@ FRT_Supervisor::GetListenPort() const
}
-bool
-FRT_Supervisor::RunInvocation(FRT_RPCInvoker *invoker)
-{
- // XXX: implement queue with max length + max # threads
-
- if (_threadPool == nullptr ||
- _threadPool->NewThread(invoker) == nullptr)
- {
- invoker->GetRequest()->SetError(FRTE_RPC_OVERLOAD,
- "Could not start thread");
- return false;
- }
- return true;
-}
-
-
FRT_Target *
FRT_Supervisor::GetTarget(const char *spec)
{
@@ -179,7 +163,7 @@ FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method,
{
delete _methodMismatchHook;
_methodMismatchHook = new FRT_Method("frt.hook.methodMismatch", "*", "*",
- true, method, handler);
+ method, handler);
assert(_methodMismatchHook != nullptr);
}
@@ -284,25 +268,17 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context)
&& _methodMismatchHook != nullptr)
{
invoker->ForceMethod(_methodMismatchHook);
- return (invoker->Invoke(false)) ?
+ return (invoker->Invoke()) ?
FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL;
}
invoker->HandleDone(false);
return FNET_FREE_CHANNEL;
- } else if (invoker->IsInstant()) {
-
- return (invoker->Invoke(false)) ?
- FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL;
-
} else {
- if (!RunInvocation(invoker)) {
- invoker->HandleDone(false);
- return FNET_FREE_CHANNEL;
- }
- return FNET_CLOSE_CHANNEL;
+ return (invoker->Invoke()) ?
+ FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL;
}
}
@@ -349,17 +325,17 @@ FRT_Supervisor::RPCHooks::InitRPC(FRT_Supervisor *supervisor)
{
FRT_ReflectionBuilder rb(supervisor);
//---------------------------------------------------------------------------
- rb.DefineMethod("frt.rpc.ping", "", "", true,
+ rb.DefineMethod("frt.rpc.ping", "", "",
FRT_METHOD(FRT_Supervisor::RPCHooks::RPC_Ping), this);
rb.MethodDesc("Method that may be used to check if the server is online");
//---------------------------------------------------------------------------
- rb.DefineMethod("frt.rpc.echo", "*", "*", true,
+ rb.DefineMethod("frt.rpc.echo", "*", "*",
FRT_METHOD(FRT_Supervisor::RPCHooks::RPC_Echo), this);
rb.MethodDesc("Echo the parameters as return values");
rb.ParamDesc("params", "Any set of parameters");
rb.ReturnDesc("return", "The parameter values");
//---------------------------------------------------------------------------
- rb.DefineMethod("frt.rpc.getMethodList", "", "SSS", true,
+ rb.DefineMethod("frt.rpc.getMethodList", "", "SSS",
FRT_METHOD(FRT_Supervisor::RPCHooks::RPC_GetMethodList),
this);
rb.MethodDesc("Obtain a list of all available methods");
@@ -367,7 +343,7 @@ FRT_Supervisor::RPCHooks::InitRPC(FRT_Supervisor *supervisor)
rb.ReturnDesc("params", "Method parameter types");
rb.ReturnDesc("return", "Method return types");
//---------------------------------------------------------------------------
- rb.DefineMethod("frt.rpc.getMethodInfo", "s", "sssSSSS", true,
+ rb.DefineMethod("frt.rpc.getMethodInfo", "s", "sssSSSS",
FRT_METHOD(FRT_Supervisor::RPCHooks::RPC_GetMethodInfo),
this);
rb.MethodDesc("Obtain detailed information about a single method");
@@ -448,7 +424,7 @@ FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method,
{
delete _sessionInitHook;
_sessionInitHook = new FRT_Method("frt.hook.sessionInit", "", "",
- true, method, handler);
+ method, handler);
assert(_sessionInitHook != nullptr);
}
@@ -459,7 +435,7 @@ FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method,
{
delete _sessionDownHook;
_sessionDownHook = new FRT_Method("frt.hook.sessionDown", "", "",
- true, method, handler);
+ method, handler);
assert(_sessionDownHook != nullptr);
}
@@ -470,7 +446,7 @@ FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method,
{
delete _sessionFiniHook;
_sessionFiniHook = new FRT_Method("frt.hook.sessionFini", "", "",
- true, method, handler);
+ method, handler);
assert(_sessionFiniHook != nullptr);
}
diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h
index 051c1caceeb..dc7fb496239 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.h
+++ b/fnet/src/vespa/fnet/frt/supervisor.h
@@ -99,8 +99,6 @@ public:
bool Listen(int port);
uint32_t GetListenPort() const;
- bool RunInvocation(FRT_RPCInvoker *invoker);
-
FRT_Target *GetTarget(const char *spec);
FRT_Target *Get2WayTarget(const char *spec,
FNET_Context connContext = FNET_Context());
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index 148dabf5c60..d4244cbf204 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -12,7 +12,6 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner,
: _ioc_next(nullptr),
_ioc_prev(nullptr),
_ioc_owner(owner),
- _ioc_counters(_ioc_owner->GetStatCounters()),
_ioc_socket_fd(socket_fd),
_ioc_selector(nullptr),
_ioc_spec(nullptr),
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index 16ecce2e345..901c3d1a5d0 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -2,14 +2,12 @@
#pragma once
-#include "stats.h"
#include <vespa/fastos/timestamp.h>
#include <vespa/vespalib/net/selector.h>
#include <mutex>
#include <condition_variable>
class FNET_TransportThread;
-class FNET_StatCounters;
class FNET_Config;
/**
@@ -45,7 +43,6 @@ protected:
FNET_IOComponent *_ioc_next; // next in list
FNET_IOComponent *_ioc_prev; // prev in list
FNET_TransportThread *_ioc_owner; // owner(TransportThread) ref.
- FNET_StatCounters *_ioc_counters; // stat counters
int _ioc_socket_fd; // source of events.
Selector *_ioc_selector; // attached event selector
char *_ioc_spec; // connect/listen spec
@@ -154,47 +151,6 @@ public:
**/
void UpdateTimeOut();
-
- /**
- * Count packet read(s). This is a proxy method updating the stat
- * counters associated with the owning transport object.
- *
- * @param cnt the number of packets read (default is 1).
- **/
- void CountPacketRead(uint32_t cnt = 1)
- { _ioc_counters->CountPacketRead(cnt); }
-
-
- /**
- * Count packet write(s). This is a proxy method updating the stat
- * counters associated with the owning transport object.
- *
- * @param cnt the number of packets written (default is 1).
- **/
- void CountPacketWrite(uint32_t cnt = 1)
- { _ioc_counters->CountPacketWrite(cnt); }
-
-
- /**
- * Count read data. This is a proxy method updating the stat
- * counters associated with the owning transport object.
- *
- * @param bytes the number of bytes read.
- **/
- void CountDataRead(uint32_t bytes)
- { _ioc_counters->CountDataRead(bytes); }
-
-
- /**
- * Count written data. This is a proxy method updating the stat
- * counters associated with the owning transport object.
- *
- * @param bytes the number of bytes written.
- **/
- void CountDataWrite(uint32_t bytes)
- { _ioc_counters->CountDataWrite(bytes); }
-
-
/**
* Attach an event selector to this component. Before deleting an
* IOC, one must first call detach_selector to detach the
diff --git a/fnet/src/vespa/fnet/stats.cpp b/fnet/src/vespa/fnet/stats.cpp
deleted file mode 100644
index f156fe4afe7..00000000000
--- a/fnet/src/vespa/fnet/stats.cpp
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "stats.h"
-
-#include <vespa/log/log.h>
-LOG_SETUP(".fnet");
-
-FNET_StatCounters::FNET_StatCounters()
- : _eventLoopCnt(0),
- _eventCnt(0),
- _ioEventCnt(0),
- _packetReadCnt(0),
- _packetWriteCnt(0),
- _dataReadCnt(0),
- _dataWriteCnt(0)
-{
-}
-
-
-FNET_StatCounters::~FNET_StatCounters()
-{
-}
-
-
-void
-FNET_StatCounters::Clear()
-{
- _eventLoopCnt = 0;
- _eventCnt = 0;
- _ioEventCnt = 0;
- _packetReadCnt = 0;
- _packetWriteCnt = 0;
- _dataReadCnt = 0;
- _dataWriteCnt = 0;
-}
-
-//-----------------------------------------------
-
-FNET_Stats::FNET_Stats()
- : _eventLoopRate(0),
- _eventRate(0),
- _ioEventRate(0),
- _packetReadRate(0),
- _packetWriteRate(0),
- _dataReadRate(0),
- _dataWriteRate(0)
-{
-}
-
-
-FNET_Stats::~FNET_Stats()
-{
-}
-
-
-void
-FNET_Stats::Update(FNET_StatCounters *count, double secs)
-{
- _eventLoopRate = (float)(FNET_STATS_OLD_FACTOR * _eventLoopRate
- + (FNET_STATS_NEW_FACTOR
- * ((double)count->_eventLoopCnt / secs)));
- _eventRate = (float)(FNET_STATS_OLD_FACTOR * _eventRate
- + (FNET_STATS_NEW_FACTOR
- * ((double)count->_eventCnt / secs)));
- _ioEventRate = (float)(FNET_STATS_OLD_FACTOR * _ioEventRate
- + (FNET_STATS_NEW_FACTOR
- * ((double)count->_ioEventCnt / secs)));
-
- _packetReadRate = (float)(FNET_STATS_OLD_FACTOR * _packetReadRate
- + (FNET_STATS_NEW_FACTOR
- * ((double)count->_packetReadCnt / secs)));
- _packetWriteRate = (float)(FNET_STATS_OLD_FACTOR * _packetWriteRate
- + (FNET_STATS_NEW_FACTOR
- * ((double)count->_packetWriteCnt / secs)));
-
- _dataReadRate = (float)(FNET_STATS_OLD_FACTOR * _dataReadRate
- + (FNET_STATS_NEW_FACTOR
- * ((double)count->_dataReadCnt / (1000.0 * secs))));
- _dataWriteRate = (float)(FNET_STATS_OLD_FACTOR * _dataWriteRate
- + (FNET_STATS_NEW_FACTOR
- * ((double)count->_dataWriteCnt / (1000.0 * secs))));
-}
-
-
-void
-FNET_Stats::Log()
-{
- LOG(info, "events[/s][loop/int/io][%.1f/%.1f/%.1f] "
- "packets[/s][r/w][%.1f/%.1f] "
- "data[kB/s][r/w][%.2f/%.2f]",
- _eventLoopRate,
- _eventRate,
- _ioEventRate,
- _packetReadRate,
- _packetWriteRate,
- _dataReadRate,
- _dataWriteRate);
-}
diff --git a/fnet/src/vespa/fnet/stats.h b/fnet/src/vespa/fnet/stats.h
deleted file mode 100644
index 76651393165..00000000000
--- a/fnet/src/vespa/fnet/stats.h
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <cstdint>
-
-/**
- * This class is used internally by @ref FNET_Transport objects to
- * aggregate FNET statistics. The actual statistics are located in the
- * @ref FNET_Stats class.
- **/
-class FNET_StatCounters
-{
-public:
- uint32_t _eventLoopCnt; // # event loop iterations
- uint32_t _eventCnt; // # internal events
- uint32_t _ioEventCnt; // # IO events
- uint32_t _packetReadCnt; // # packets read
- uint32_t _packetWriteCnt; // # packets written
- uint32_t _dataReadCnt; // # bytes read
- uint32_t _dataWriteCnt; // # bytes written
-
- FNET_StatCounters();
- ~FNET_StatCounters();
-
- void Clear();
- void CountEventLoop(uint32_t cnt) { _eventLoopCnt += cnt; }
- void CountEvent(uint32_t cnt) { _eventCnt += cnt; }
- void CountIOEvent(uint32_t cnt) { _ioEventCnt += cnt; }
- void CountPacketRead(uint32_t cnt) { _packetReadCnt += cnt; }
- void CountPacketWrite(uint32_t cnt) { _packetWriteCnt += cnt; }
- void CountDataRead(uint32_t bytes) { _dataReadCnt += bytes; }
- void CountDataWrite(uint32_t bytes) { _dataWriteCnt += bytes; }
-};
-
-//-----------------------------------------------
-
-#define FNET_STATS_OLD_FACTOR 0.5
-#define FNET_STATS_NEW_FACTOR 0.5
-
-/**
- * This class contains various FNET statistics. The statistics for a
- * @ref FNET_Transport object may be obtained by invoking the GetStats
- * method on it.
- **/
-class FNET_Stats
-{
-public:
- /**
- * Event loop iterations per second.
- **/
- float _eventLoopRate; // loop iterations/s
-
- /**
- * Internal events handled per second.
- **/
- float _eventRate; // internal-events/s
-
- /**
- * IO events handled per second.
- **/
- float _ioEventRate; // IO-events/s
-
- /**
- * Packets read per second.
- **/
- float _packetReadRate; // packets/s
-
- /**
- * Packets written per second.
- **/
- float _packetWriteRate; // packets/s
-
- /**
- * Data read per second (in kB).
- **/
- float _dataReadRate; // kB/s
-
- /**
- * Data written per second (in kB).
- **/
- float _dataWriteRate; // kB/s
-
- FNET_Stats();
- ~FNET_Stats();
-
- /**
- * Update statistics. The new statistics are calculated based on
- * both the current values and the input count structure indicating
- * what has happened since the last statistics update.
- *
- * @param count what has happened since last statistics update.
- * @param secs number of seconds since last statistics update.
- **/
- void Update(FNET_StatCounters *count, double secs);
-
- /**
- * Invoking this method will generate a log message of type
- * FNET_INFO showing the values held by this object.
- **/
- void Log();
-};
-
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index 55c847682c4..dfeb8d03436 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -120,14 +120,6 @@ FNET_Transport::SetTCPNoDelay(bool noDelay)
}
void
-FNET_Transport::SetLogStats(bool logStats)
-{
- for (const auto &thread: _threads) {
- thread->SetLogStats(logStats);
- }
-}
-
-void
FNET_Transport::sync()
{
for (const auto &thread: _threads) {
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index dbf914798fd..15e69bd66a6 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -195,14 +195,6 @@ public:
void SetTCPNoDelay(bool noDelay);
/**
- * Enable or disable logging of FNET statistics. This feature is
- * disabled by default.
- *
- * @param logStats true if stats should be logged.
- **/
- void SetLogStats(bool logStats);
-
- /**
* Synchronize with all transport threads. This method will block
* until all events posted before this method was invoked has been
* processed. If a transport thread has been shut down (or is in
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 34ab9091072..b0388bdc140 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -31,15 +31,6 @@ struct Sync : public FNET_IExecutable
} // namespace<unnamed>
-#ifndef IAM_DOXYGEN
-void
-FNET_TransportThread::StatsTask::PerformTask()
-{
- _transport->UpdateStats();
- Schedule(5.0);
-}
-#endif
-
void
FNET_TransportThread::AddComponent(FNET_IOComponent *comp)
{
@@ -160,22 +151,6 @@ FNET_TransportThread::DiscardEvent(FNET_ControlPacket *cpacket,
}
-void
-FNET_TransportThread::UpdateStats()
-{
- _now.SetNow(); // trade some overhead for better stats
- double ms = _now.MilliSecs() - _statTime.MilliSecs();
- _statTime = _now;
- {
- std::lock_guard<std::mutex> guard(_lock);
- _stats.Update(&_counters, ms / 1000.0);
- }
- _counters.Clear();
-
- if (_config._logStats)
- _stats.Log();
-}
-
extern "C" {
static void pipehandler(int)
@@ -203,10 +178,6 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
_startTime(),
_now(),
_scheduler(&_now),
- _counters(),
- _stats(),
- _statsTask(&_scheduler, this),
- _statTime(),
_config(),
_componentsHead(nullptr),
_timeOutHead(nullptr),
@@ -424,8 +395,6 @@ FNET_TransportThread::InitEventLoop()
}
_now.SetNow();
_startTime = _now;
- _statTime = _now;
- _statsTask.Schedule(5.0);
return true;
}
@@ -435,7 +404,7 @@ FNET_TransportThread::handle_wakeup()
{
{
std::lock_guard<std::mutex> guard(_lock);
- CountEvent(_queue.FlushPackets_NoLock(&_myQueue));
+ _queue.FlushPackets_NoLock(&_myQueue);
}
FNET_Context context;
@@ -534,7 +503,6 @@ FNET_TransportThread::EventLoopIteration()
// obtain I/O events
_selector.poll(msTimeout);
- CountEventLoop();
// sample current time (performed once per event loop iteration)
_now.SetNow();
@@ -548,7 +516,6 @@ FNET_TransportThread::EventLoopIteration()
#endif
// handle wakeup and io-events
- CountIOEvent(_selector.num_events());
_selector.dispatch(*this);
// handle IOC time-outs
@@ -579,9 +546,6 @@ FNET_TransportThread::EventLoopIteration()
if (_finished)
return false;
- // unschedule statistics task
- _statsTask.Kill();
-
// flush event queue
{
std::lock_guard<std::mutex> guard(_lock);
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 3e5fe49e73a..1b8d1fa4eeb 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -6,7 +6,6 @@
#include "config.h"
#include "task.h"
#include "packetqueue.h"
-#include "stats.h"
#include <vespa/fastos/thread.h>
#include <vespa/fastos/time.h>
#include <vespa/vespalib/net/socket_handle.h>
@@ -31,31 +30,11 @@ class FNET_TransportThread : public FastOS_Runnable
public:
using Selector = vespalib::Selector<FNET_IOComponent>;
-#ifndef IAM_DOXYGEN
- class StatsTask : public FNET_Task
- {
- private:
- FNET_TransportThread *_transport;
- StatsTask(const StatsTask &);
- StatsTask &operator=(const StatsTask &);
- public:
- StatsTask(FNET_Scheduler *scheduler,
- FNET_TransportThread *transport) : FNET_Task(scheduler),
- _transport(transport) {}
- void PerformTask() override;
- };
- friend class FNET_TransportThread::StatsTask;
-#endif // DOXYGEN
-
private:
FNET_Transport &_owner; // owning transport layer
FastOS_Time _startTime; // when event loop started
FastOS_Time _now; // current time sampler
FNET_Scheduler _scheduler; // transport thread scheduler
- FNET_StatCounters _counters; // stat counters
- FNET_Stats _stats; // current stats
- StatsTask _statsTask; // stats task
- FastOS_Time _statTime; // last stat update
FNET_Config _config; // FNET configuration [static]
FNET_IOComponent *_componentsHead; // I/O component list head
FNET_IOComponent *_timeOutHead; // first IOC in list to time out
@@ -156,49 +135,6 @@ private:
/**
- * Update internal FNET statistics. This method is called regularly
- * by the statistics update task.
- **/
- void UpdateStats();
-
-
- /**
- * Obtain a reference to the stat counters used by this transport
- * object.
- *
- * @return stat counters for this transport object.
- **/
- FNET_StatCounters *GetStatCounters() { return &_counters; }
-
-
- /**
- * Count event loop iteration(s).
- *
- * @param cnt event loop iterations (default is 1).
- **/
- void CountEventLoop(uint32_t cnt = 1)
- { _counters.CountEventLoop(cnt); }
-
-
- /**
- * Count internal event(s).
- *
- * @param cnt number of internal events.
- **/
- void CountEvent(uint32_t cnt)
- { _counters.CountEvent(cnt); }
-
-
- /**
- * Count IO events.
- *
- * @param cnt number of IO events.
- **/
- void CountIOEvent(uint32_t cnt)
- { _counters.CountIOEvent(cnt); }
-
-
- /**
* Obtain a reference to the object holding the configuration for
* this transport object.
*
@@ -355,15 +291,6 @@ public:
/**
- * Enable or disable logging of FNET statistics. This feature is
- * disabled by default.
- *
- * @param logStats true if stats should be logged.
- **/
- void SetLogStats(bool logStats) { _config._logStats = logStats; }
-
-
- /**
* Add an I/O component to the working set of this transport
* object. Note that the actual work is performed by the transport
* thread. This method simply posts an event on the transport thread