diff options
Diffstat (limited to 'fnet/src')
34 files changed, 139 insertions, 594 deletions
diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp index 801de59b515..7c6434e870a 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp @@ -26,7 +26,7 @@ RPC::Init(FRT_Supervisor *s) { FRT_ReflectionBuilder rb(s); //------------------------------------------------------------------- - rb.DefineMethod("prod", "", "", true, + rb.DefineMethod("prod", "", "", FRT_METHOD(RPC::Prod), this); //------------------------------------------------------------------- } @@ -45,6 +45,7 @@ MyApp::Main() printf("usage : rpc_server <connectspec>\n"); return 1; } + bool ok = true; RPC rpc; FRT_Supervisor orb; rpc.Init(&orb); @@ -63,6 +64,7 @@ MyApp::Main() printf("[error(%d): %s]\n", req->GetErrorCode(), req->GetErrorMessage()); + ok = false; } printf("invokeCnt: %d\n", rpc.invokeCnt); @@ -76,6 +78,7 @@ MyApp::Main() printf("[error(%d): %s]\n", req->GetErrorCode(), req->GetErrorMessage()); + ok = false; } printf("invokeCnt: %d\n", rpc.invokeCnt); @@ -89,14 +92,18 @@ MyApp::Main() printf("[error(%d): %s]\n", req->GetErrorCode(), req->GetErrorMessage()); + ok = false; } printf("invokeCnt: %d\n", rpc.invokeCnt); + if (rpc.invokeCnt != 3) { + ok = false; + } req->SubRef(); target->SubRef(); orb.ShutDown(true); - return 0; + return ok ? 0 : 1; } diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp index 33207282bcc..ac7b34ebda0 100644 --- a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp @@ -2,6 +2,7 @@ #include <vespa/fnet/frt/frt.h> #include <vespa/fastos/app.h> +#include <thread> #include <vespa/log/log.h> LOG_SETUP("rpc_callback_server"); @@ -12,9 +13,7 @@ struct RPC : public FRT_Invokable void Init(FRT_Supervisor *s); }; -void -RPC::CallBack(FRT_RPCRequest *req) -{ +void do_callback(FRT_RPCRequest *req) { FNET_Connection *conn = req->GetConnection(); FRT_RPCRequest *cb = new FRT_RPCRequest(); cb->SetMethodName(req->GetParams()->GetValue(0)._string._str); @@ -25,6 +24,14 @@ RPC::CallBack(FRT_RPCRequest *req) cb->GetErrorMessage()); } cb->SubRef(); + req->Return(); +} + +void +RPC::CallBack(FRT_RPCRequest *req) +{ + req->Detach(); + std::thread(do_callback, req).detach(); } void @@ -32,7 +39,7 @@ RPC::Init(FRT_Supervisor *s) { FRT_ReflectionBuilder rb(s); //------------------------------------------------------------------- - rb.DefineMethod("callBack", "s", "", false, + rb.DefineMethod("callBack", "s", "", FRT_METHOD(RPC::CallBack), this); //------------------------------------------------------------------- } diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp index 8947663216e..03d618133c9 100644 --- a/fnet/src/examples/frt/rpc/rpc_server.cpp +++ b/fnet/src/examples/frt/rpc/rpc_server.cpp @@ -28,21 +28,21 @@ RPCServer::InitRPC(FRT_Supervisor *s) { FRT_ReflectionBuilder rb(s); //------------------------------------------------------------------- - rb.DefineMethod("concat", "ss", "s", true, + rb.DefineMethod("concat", "ss", "s", FRT_METHOD(RPCServer::RPC_concat), this); rb.MethodDesc("Concatenate two strings"); rb.ParamDesc("string1", "a string"); rb.ParamDesc("string2", "another string"); rb.ReturnDesc("ret", "the concatenation of string1 and string2"); //------------------------------------------------------------------- - rb.DefineMethod("addFloat", "ff", "f", true, + rb.DefineMethod("addFloat", "ff", "f", FRT_METHOD(RPCServer::RPC_addFloat), this); rb.MethodDesc("Add two floats"); rb.ParamDesc("float1", "a float"); rb.ParamDesc("float2", "another float"); rb.ReturnDesc("ret", "float1 + float2"); //------------------------------------------------------------------- - rb.DefineMethod("addDouble", "dd", "d", true, + rb.DefineMethod("addDouble", "dd", "d", FRT_METHOD(RPCServer::RPC_addDouble), this); rb.MethodDesc("Add two doubles"); rb.ParamDesc("double1", "a double"); diff --git a/fnet/src/examples/proxy/proxy.cpp b/fnet/src/examples/proxy/proxy.cpp index 653b445581f..a01a16ead9c 100644 --- a/fnet/src/examples/proxy/proxy.cpp +++ b/fnet/src/examples/proxy/proxy.cpp @@ -227,7 +227,6 @@ Proxy::Main() if (listener != nullptr) listener->SubRef(); - _transport.SetLogStats(true); FNET_SignalShutDown ssd(_transport); _transport.Main(); return 0; diff --git a/fnet/src/tests/frt/method_pt/method_pt.cpp b/fnet/src/tests/frt/method_pt/method_pt.cpp index db5905d6871..5417fddceeb 100644 --- a/fnet/src/tests/frt/method_pt/method_pt.cpp +++ b/fnet/src/tests/frt/method_pt/method_pt.cpp @@ -207,35 +207,35 @@ void initTest() { //------------------------------------------------------------------- - rb.DefineMethod("simpleMethod", "", "", true, + rb.DefineMethod("simpleMethod", "", "", FRT_METHOD(SimpleHandler::RPC_Method), _simpleHandler); //------------------------------------------------------------------- - rb.DefineMethod("mediumMethod1", "", "", true, + rb.DefineMethod("mediumMethod1", "", "", FRT_METHOD(MediumHandler1::RPC_Method), _mediumHandler1); - rb.DefineMethod("mediumMethod2", "", "", true, + rb.DefineMethod("mediumMethod2", "", "", FRT_METHOD(MediumHandler2::RPC_Method), _mediumHandler2); - rb.DefineMethod("mediumMethod3", "", "", true, + rb.DefineMethod("mediumMethod3", "", "", FRT_METHOD(MediumHandler3::RPC_Method), _mediumHandler3); //------------------------------------------------------------------- - rb.DefineMethod("complexMethod1", "", "", true, + rb.DefineMethod("complexMethod1", "", "", FRT_METHOD(ComplexHandler1::RPC_Method), _complexHandler1); - rb.DefineMethod("complexMethod2", "", "", true, + rb.DefineMethod("complexMethod2", "", "", FRT_METHOD(ComplexHandler2::RPC_Method), _complexHandler2); - rb.DefineMethod("complexMethod3", "", "", true, + rb.DefineMethod("complexMethod3", "", "", FRT_METHOD(ComplexHandler3::RPC_Method), _complexHandler3); diff --git a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp index 4f8b4b82743..31aec84afd5 100644 --- a/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp +++ b/fnet/src/tests/frt/parallel_rpc/parallel_rpc_test.cpp @@ -3,16 +3,19 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/fnet/frt/frt.h> #include <vespa/vespalib/util/benchmark_timer.h> +#include <vespa/vespalib/net/crypto_engine.h> +#include <vespa/vespalib/net/tls/tls_crypto_engine.h> +#include <vespa/vespalib/test/make_tls_options_for_testing.h> #include <thread> -using vespalib::BenchmarkTimer; +using namespace vespalib; struct Rpc : FRT_Invokable { FastOS_ThreadPool thread_pool; FNET_Transport transport; FRT_Supervisor orb; - Rpc(size_t num_threads) - : thread_pool(128 * 1024), transport(num_threads), orb(&transport, &thread_pool) {} + Rpc(CryptoEngine::SP crypto, size_t num_threads) + : thread_pool(128 * 1024), transport(crypto, num_threads), orb(&transport, &thread_pool) {} void start() { ASSERT_TRUE(transport.Start(&thread_pool)); } @@ -31,13 +34,13 @@ struct Rpc : FRT_Invokable { struct Server : Rpc { uint32_t port; - Server(size_t num_threads) : Rpc(num_threads), port(listen()) { + Server(CryptoEngine::SP crypto, size_t num_threads) : Rpc(crypto, num_threads), port(listen()) { init_rpc(); start(); } void init_rpc() { FRT_ReflectionBuilder rb(&orb); - rb.DefineMethod("inc", "l", "l", true, FRT_METHOD(Server::rpc_inc), this); + rb.DefineMethod("inc", "l", "l", FRT_METHOD(Server::rpc_inc), this); rb.MethodDesc("increment a 64-bit integer"); rb.ParamDesc("in", "an integer (64 bit)"); rb.ReturnDesc("out", "in + 1 (64 bit)"); @@ -51,7 +54,7 @@ struct Server : Rpc { struct Client : Rpc { uint32_t port; - Client(size_t num_threads, const Server &server) : Rpc(num_threads), port(server.port) { + Client(CryptoEngine::SP crypto, size_t num_threads, const Server &server) : Rpc(crypto, num_threads), port(server.port) { start(); } FRT_Target *connect() { return Rpc::connect(port); } @@ -114,10 +117,26 @@ void perform_test(size_t thread_id, Client &client, Result &result) { } } -TEST_MT_FFF("parallel rpc with 1/1 transport threads and 128 user threads", - 128, Server(1), Client(1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } +CryptoEngine::SP null_crypto = std::make_shared<NullCryptoEngine>(); +CryptoEngine::SP xor_crypto = std::make_shared<XorCryptoEngine>(); +CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing()); -TEST_MT_FFF("parallel rpc with 8/8 transport threads and 128 user threads", - 128, Server(8), Client(8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } +TEST_MT_FFF("parallel rpc with 1/1 transport threads and 128 user threads (no encryption)", + 128, Server(null_crypto, 1), Client(null_crypto, 1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } + +TEST_MT_FFF("parallel rpc with 1/1 transport threads and 128 user threads (xor encryption)", + 128, Server(xor_crypto, 1), Client(xor_crypto, 1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } + +TEST_MT_FFF("parallel rpc with 1/1 transport threads and 128 user threads (tls encryption)", + 128, Server(tls_crypto, 1), Client(tls_crypto, 1, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } + +TEST_MT_FFF("parallel rpc with 8/8 transport threads and 128 user threads (no encryption)", + 128, Server(null_crypto, 8), Client(null_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } + +TEST_MT_FFF("parallel rpc with 8/8 transport threads and 128 user threads (xor encryption)", + 128, Server(xor_crypto, 8), Client(xor_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } + +TEST_MT_FFF("parallel rpc with 8/8 transport threads and 128 user threads (tls encryption)", + 128, Server(tls_crypto, 8), Client(tls_crypto, 8, f1), Result(num_threads)) { perform_test(thread_id, f2, f3); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/frt/rpc/CMakeLists.txt b/fnet/src/tests/frt/rpc/CMakeLists.txt index f935590ee77..2bacd37686a 100644 --- a/fnet/src/tests/frt/rpc/CMakeLists.txt +++ b/fnet/src/tests/frt/rpc/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_executable(fnet_invoke_test_app TEST ) vespa_add_test(NAME fnet_invoke_test_app COMMAND fnet_invoke_test_app) vespa_add_test(NAME fnet_invoke_test_app_xor COMMAND fnet_invoke_test_app ENVIRONMENT "CRYPTOENGINE=xor") +vespa_add_test(NAME fnet_invoke_test_app_tls COMMAND fnet_invoke_test_app ENVIRONMENT "CRYPTOENGINE=tls") vespa_add_executable(fnet_detach_return_invoke_test_app TEST SOURCES detach_return_invoke.cpp @@ -22,6 +23,7 @@ vespa_add_executable(fnet_session_test_app TEST ) vespa_add_test(NAME fnet_session_test_app COMMAND fnet_session_test_app) vespa_add_test(NAME fnet_session_test_app_xor COMMAND fnet_session_test_app ENVIRONMENT "CRYPTOENGINE=xor") +vespa_add_test(NAME fnet_session_test_app_tls COMMAND fnet_session_test_app ENVIRONMENT "CRYPTOENGINE=tls") vespa_add_executable(fnet_sharedblob_test_app TEST SOURCES sharedblob.cpp diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index 54a891261c2..ab21c62bb68 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -20,7 +20,7 @@ struct Server : public FRT_Invokable Server(FRT_Supervisor &s, Receptor &r) : orb(s), receptor(r) { FRT_ReflectionBuilder rb(&s); - rb.DefineMethod("hook", "", "", true, + rb.DefineMethod("hook", "", "", FRT_METHOD(Server::rpc_hook), this); } diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index 787adb227f9..e3bd662214f 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -124,7 +124,7 @@ public: assert(_echo_stash != nullptr && _echo_args != nullptr); FRT_ReflectionBuilder rb(supervisor); - rb.DefineMethod("echo", "*", "*", true, + rb.DefineMethod("echo", "*", "*", FRT_METHOD(EchoTest::RPC_Echo), this); FRT_Values *args = _echo_args; @@ -225,17 +225,15 @@ public: { FRT_ReflectionBuilder rb(supervisor); - rb.DefineMethod("inc", "i", "i", true, + rb.DefineMethod("inc", "i", "i", FRT_METHOD(TestRPC::RPC_Inc), this); - rb.DefineMethod("setValue", "i", "", true, + rb.DefineMethod("setValue", "i", "", FRT_METHOD(TestRPC::RPC_SetValue), this); - rb.DefineMethod("incValue", "", "", true, + rb.DefineMethod("incValue", "", "", FRT_METHOD(TestRPC::RPC_IncValue), this); - rb.DefineMethod("getValue", "", "i", true, + rb.DefineMethod("getValue", "", "i", FRT_METHOD(TestRPC::RPC_GetValue), this); - rb.DefineMethod("testFast", "iiibb", "i", true, - FRT_METHOD(TestRPC::RPC_Test), this); - rb.DefineMethod("testSlow", "iiibb", "i", false, + rb.DefineMethod("testFast", "iiibb", "i", FRT_METHOD(TestRPC::RPC_Test), this); } @@ -364,7 +362,6 @@ const char phase_names[PHASE_ZZZ][32] = enum { TIMING_NULL = 0, TIMING_INSTANT, - TIMING_NON_INSTANT, TIMING_ZZZ }; @@ -372,7 +369,6 @@ const char timing_names[TIMING_ZZZ][32] = { "nullptr", "INSTANT", - "NON-INSTANT" }; enum { @@ -451,17 +447,10 @@ struct State { void PrepareTestMethod() { NewReq(); - bool instant = (_timing == TIMING_INSTANT); - if (_timing != TIMING_INSTANT && - _timing != TIMING_NON_INSTANT) - { + if (_timing != TIMING_INSTANT) { ASSERT_TRUE(false); // consult your dealer... } - if (instant) { - _req->SetMethodName("testFast"); - } else { - _req->SetMethodName("testSlow"); - } + _req->SetMethodName("testFast"); } void SetTestParams(uint32_t value, uint32_t delay, @@ -928,9 +917,9 @@ TEST_F("invoke test", State()) { EXPECT_TRUE(_phase_simple_cnt == 1); EXPECT_TRUE(_phase_void_cnt == 1); EXPECT_TRUE(_phase_speed_cnt == 1); - EXPECT_TRUE(_phase_advanced_cnt == 4); - EXPECT_TRUE(_phase_error_cnt == 4); - EXPECT_TRUE(_phase_abort_cnt == 4); + EXPECT_TRUE(_phase_advanced_cnt == 2); + EXPECT_TRUE(_phase_error_cnt == 2); + EXPECT_TRUE(_phase_abort_cnt == 2); EXPECT_TRUE(_phase_echo_cnt == 1); } diff --git a/fnet/src/tests/frt/rpc/my_crypto_engine.hpp b/fnet/src/tests/frt/rpc/my_crypto_engine.hpp index 6f573e5695a..6cd8d47e917 100644 --- a/fnet/src/tests/frt/rpc/my_crypto_engine.hpp +++ b/fnet/src/tests/frt/rpc/my_crypto_engine.hpp @@ -1,15 +1,21 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/net/tls/tls_crypto_engine.h> +#include <vespa/vespalib/test/make_tls_options_for_testing.h> + vespalib::CryptoEngine::SP my_crypto_engine() { const char *env_str = getenv("CRYPTOENGINE"); if (!env_str) { - fprintf(stderr, "crypto engine: default\n"); - return vespalib::CryptoEngine::get_default(); + fprintf(stderr, "crypto engine: null\n"); + return std::make_shared<vespalib::NullCryptoEngine>(); } std::string engine(env_str); if (engine == "xor") { fprintf(stderr, "crypto engine: xor\n"); return std::make_shared<vespalib::XorCryptoEngine>(); + } else if (engine == "tls") { + fprintf(stderr, "crypto engine: tls\n"); + return std::make_shared<vespalib::TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing()); } TEST_FATAL(("invalid crypto engine: " + engine).c_str()); abort(); diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp index b84db9b4e88..93f14647e21 100644 --- a/fnet/src/tests/frt/rpc/session.cpp +++ b/fnet/src/tests/frt/rpc/session.cpp @@ -77,9 +77,9 @@ struct RPC : public FRT_Invokable void Init(FRT_Supervisor *s) { FRT_ReflectionBuilder rb(s); - rb.DefineMethod("getValue", "", "i", true, + rb.DefineMethod("getValue", "", "i", FRT_METHOD(RPC::GetValue), this); - rb.DefineMethod("setValue", "i", "", true, + rb.DefineMethod("setValue", "i", "", FRT_METHOD(RPC::SetValue), this); s->SetSessionInitHook(FRT_METHOD(RPC::InitSession), this); s->SetSessionFiniHook(FRT_METHOD(RPC::FiniSession), this); diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp index 10eaad9c013..a48ecbb1da7 100644 --- a/fnet/src/tests/frt/rpc/sharedblob.cpp +++ b/fnet/src/tests/frt/rpc/sharedblob.cpp @@ -176,7 +176,7 @@ TEST("testImplicitShared") { ServerSampler serverSampler(dataSet, req); { FRT_ReflectionBuilder rb(&orb); - rb.DefineMethod("test", "*", "*", true, + rb.DefineMethod("test", "*", "*", FRT_METHOD(ServerSampler::RPC_test), &serverSampler); } orb.Listen(0); diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index f22c1402437..f76e66c2af6 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -24,7 +24,7 @@ struct RPC : public FRT_Invokable { FRT_ReflectionBuilder rb(s); //------------------------------------------------------------------- - rb.DefineMethod("getInfo", "", "sssii", true, + rb.DefineMethod("getInfo", "", "sssii", FRT_METHOD(RPC::GetInfo), this); // FastOS version // FNET version @@ -70,10 +70,10 @@ TEST("info") { TEST("size of important objects") { - EXPECT_EQUAL(176u, sizeof(FNET_IOComponent)); + EXPECT_EQUAL(168u, sizeof(FNET_IOComponent)); EXPECT_EQUAL(32u, sizeof(FNET_Channel)); EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock)); - EXPECT_EQUAL(480u, sizeof(FNET_Connection)); + EXPECT_EQUAL(472u, sizeof(FNET_Connection)); EXPECT_EQUAL(48u, sizeof(std::condition_variable)); EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer)); EXPECT_EQUAL(24u, sizeof(FastOS_Time)); diff --git a/fnet/src/vespa/fnet/CMakeLists.txt b/fnet/src/vespa/fnet/CMakeLists.txt index 20badc5c489..4b9d818e5ed 100644 --- a/fnet/src/vespa/fnet/CMakeLists.txt +++ b/fnet/src/vespa/fnet/CMakeLists.txt @@ -17,7 +17,6 @@ vespa_add_library(fnet scheduler.cpp signalshutdown.cpp simplepacketstreamer.cpp - stats.cpp task.cpp transport.cpp transport_thread.cpp diff --git a/fnet/src/vespa/fnet/config.cpp b/fnet/src/vespa/fnet/config.cpp index feed7f2d241..a546d38f78b 100644 --- a/fnet/src/vespa/fnet/config.cpp +++ b/fnet/src/vespa/fnet/config.cpp @@ -3,11 +3,9 @@ #include "config.h" FNET_Config::FNET_Config() - : _minEventTimeOut(0), - _pingInterval(0), - _iocTimeOut(0), + : _iocTimeOut(0), _maxInputBufferSize(0x10000), _maxOutputBufferSize(0x10000), - _tcpNoDelay(true), - _logStats(false) -{ } + _tcpNoDelay(true) +{ +} diff --git a/fnet/src/vespa/fnet/config.h b/fnet/src/vespa/fnet/config.h index e94cf0f6105..3f34c1511b6 100644 --- a/fnet/src/vespa/fnet/config.h +++ b/fnet/src/vespa/fnet/config.h @@ -11,14 +11,10 @@ class FNET_Config { public: - uint32_t _minEventTimeOut; - uint32_t _pingInterval; uint32_t _iocTimeOut; uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; bool _tcpNoDelay; - bool _logStats; FNET_Config(); }; - diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index b9db8a46a4f..f2864d1dd58 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -110,13 +110,6 @@ FNET_Connection::SetState(State state) } if (oldstate < FNET_CLOSING && state >= FNET_CLOSING) { - if (_flags._writeLock) { - _flags._discarding = true; - while (_flags._writeLock) - _ioc_cond.wait(guard); - _flags._discarding = false; - } - while (!_queue.IsEmpty_NoLock() || !_myQueue.IsEmpty_NoLock()) { _flags._discarding = true; _queue.FlushPackets_NoLock(&_myQueue); @@ -233,14 +226,13 @@ FNET_Connection::handshake() EnableReadEvent(true); EnableWriteEvent(writePendingAfterConnect()); size_t chunk_size = std::max(size_t(FNET_READ_SIZE), _socket->min_read_buffer_size()); - uint32_t ignore_stats = 0; ssize_t res = 0; do { // drain input pipeline _input.EnsureFree(chunk_size); res = _socket->drain(_input.GetFree(), _input.GetFreeLen()); if (res > 0) { _input.FreeToData((uint32_t)res); - broken = !handle_packets(ignore_stats); + broken = !handle_packets(); _input.resetIfEmpty(); } } while ((res > 0) && !broken); } @@ -258,7 +250,7 @@ FNET_Connection::handshake() } bool -FNET_Connection::handle_packets(uint32_t &read_packets) +FNET_Connection::handle_packets() { bool broken = false; for (bool done = false; !done;) { // handle each complete packet in the buffer. @@ -268,7 +260,6 @@ FNET_Connection::handle_packets(uint32_t &read_packets) &broken); } if (_flags._gotheader && (_input.GetDataLen() >= _packetLength)) { - read_packets++; HandlePacket(_packetLength, _packetCode, _packetCHID); _flags._gotheader = false; // reset header flag. } else { @@ -282,26 +273,26 @@ bool FNET_Connection::Read() { size_t chunk_size = std::max(size_t(FNET_READ_SIZE), _socket->min_read_buffer_size()); - uint32_t readData = 0; // total data read - uint32_t readPackets = 0; // total packets read int readCnt = 0; // read count bool broken = false; // is this conn broken ? + int my_errno = 0; // sample and preserve errno ssize_t res; // single read result _input.EnsureFree(chunk_size); res = _socket->read(_input.GetFree(), _input.GetFreeLen()); + my_errno = errno; readCnt++; while (res > 0) { _input.FreeToData((uint32_t)res); - readData += (uint32_t)res; - broken = !handle_packets(readPackets); + broken = !handle_packets(); _input.resetIfEmpty(); if (broken || (_input.GetFreeLen() > 0) || (readCnt >= FNET_READ_REDO)) { goto done_read; } _input.EnsureFree(chunk_size); res = _socket->read(_input.GetFree(), _input.GetFreeLen()); + my_errno = errno; readCnt++; } @@ -310,28 +301,24 @@ done_read: while ((res > 0) && !broken) { // drain input pipeline _input.EnsureFree(chunk_size); res = _socket->drain(_input.GetFree(), _input.GetFreeLen()); + my_errno = errno; readCnt++; if (res > 0) { _input.FreeToData((uint32_t)res); - readData += (uint32_t)res; - broken = !handle_packets(readPackets); + broken = !handle_packets(); _input.resetIfEmpty(); } else if (res == 0) { // fully drained -> EWOULDBLOCK - errno = EWOULDBLOCK; + my_errno = EWOULDBLOCK; res = -1; } } - if (readData > 0) { - UpdateTimeOut(); - CountDataRead(readData); - CountPacketRead(readPackets); - uint32_t maxSize = GetConfig()->_maxInputBufferSize; - if (maxSize > 0 && _input.GetBufSize() > maxSize) - { - if (!_flags._gotheader || _packetLength < maxSize) { - _input.Shrink(maxSize); - } + UpdateTimeOut(); + uint32_t maxSize = GetConfig()->_maxInputBufferSize; + if (maxSize > 0 && _input.GetBufSize() > maxSize) + { + if (!_flags._gotheader || _packetLength < maxSize) { + _input.Shrink(maxSize); } } @@ -339,9 +326,9 @@ done_read: if (res == 0) { broken = true; // handle EOF } else { // res < 0 - broken = ((errno != EWOULDBLOCK) && (errno != EAGAIN)); - if (broken && (errno != ECONNRESET)) { - LOG(debug, "Connection(%s): read error: %d", GetSpec(), errno); + broken = ((my_errno != EWOULDBLOCK) && (my_errno != EAGAIN)); + if (broken && (my_errno != ECONNRESET)) { + LOG(debug, "Connection(%s): read error: %d", GetSpec(), my_errno); } } } @@ -354,10 +341,9 @@ bool FNET_Connection::Write() { uint32_t my_write_work = 0; - uint32_t writtenData = 0; // total data written - uint32_t writtenPackets = 0; // total packets written int writeCnt = 0; // write count bool broken = false; // is this conn broken ? + int my_errno = 0; // sample and preserve errno ssize_t res; // single write result FNET_Packet *packet; @@ -374,7 +360,6 @@ FNET_Connection::Write() packet = _myQueue.DequeuePacket_NoLock(&context); if (packet->IsRegularPacket()) { // ignore non-regular packets _streamer->Encode(packet, context._value.INT, &_output); - writtenPackets++; } packet->Free(); } @@ -387,10 +372,10 @@ FNET_Connection::Write() // write data res = _socket->write(_output.GetData(), _output.GetDataLen()); + my_errno = errno; writeCnt++; if (res > 0) { _output.DataToDead((uint32_t)res); - writtenData += (uint32_t)res; _output.resetIfEmpty(); } } while (res > 0 && @@ -404,26 +389,26 @@ FNET_Connection::Write() if (res >= 0) { // flush output pipeline res = _socket->flush(); + my_errno = errno; while (res > 0) { res = _socket->flush(); + my_errno = errno; } } - if (writtenData > 0) { - uint32_t maxSize = GetConfig()->_maxOutputBufferSize; - if (maxSize > 0 && _output.GetBufSize() > maxSize) { - _output.Shrink(maxSize); - } + uint32_t maxSize = GetConfig()->_maxOutputBufferSize; + if (maxSize > 0 && _output.GetBufSize() > maxSize) { + _output.Shrink(maxSize); } if (res < 0) { - if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) { + if ((my_errno == EWOULDBLOCK) || (my_errno == EAGAIN)) { ++my_write_work; // incomplete write/flush } else { broken = true; } - if (broken && (errno != ECONNRESET)) { - LOG(debug, "Connection(%s): write error: %d", GetSpec(), errno); + if (broken && (my_errno != ECONNRESET)) { + LOG(debug, "Connection(%s): write error: %d", GetSpec(), my_errno); } } @@ -431,17 +416,9 @@ FNET_Connection::Write() _writeWork = _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock() + my_write_work; - _flags._writeLock = false; - if (_flags._discarding) { - _ioc_cond.notify_all(); - } bool writePending = (_writeWork > 0); guard.unlock(); - if (writtenData > 0) { - CountDataWrite(writtenData); - CountPacketWrite(writtenPackets); - } if (!writePending) EnableWriteEvent(false); @@ -528,7 +505,6 @@ FNET_Connection::~FNET_Connection() delete _adminChannel; } assert(_cleanup == nullptr); - assert(!_flags._writeLock); } @@ -682,9 +658,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) writeWork = _writeWork; _writeWork++; _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); - if (writeWork == 0 && !_flags._writeLock && - _state == FNET_CONNECTED) - { + if ((writeWork == 0) && (_state == FNET_CONNECTED)) { AddRef_NoLock(); guard.unlock(); Owner()->EnableWrite(this, /* needRef = */ false); @@ -693,14 +667,6 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) } -uint32_t -FNET_Connection::GetQueueLen() -{ - std::lock_guard<std::mutex> guard(_ioc_lock); - return _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock(); -} - - void FNET_Connection::Sync() { @@ -774,12 +740,6 @@ FNET_Connection::HandleWriteEvent() case FNET_CONNECTED: { std::unique_lock<std::mutex> guard(_ioc_lock); - if (_flags._writeLock) { - guard.unlock(); - EnableWriteEvent(false); - return true; - } - _flags._writeLock = true; _queue.FlushPackets_NoLock(&_myQueue); } broken = !Write(); diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 8e5e6280fab..8e275d68b18 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -68,13 +68,11 @@ private: struct Flags { Flags() : _gotheader(false), - _writeLock(false), _inCallback(false), _callbackWait(false), _discarding(false) { } bool _gotheader; - bool _writeLock; bool _inCallback; bool _callbackWait; bool _discarding; @@ -212,9 +210,8 @@ private: * for each one. * * @return false if socket is broken. - * @param read_packets count read packets here **/ - bool handle_packets(uint32_t &read_packets); + bool handle_packets(); /** * Read incoming data from socket. @@ -450,19 +447,6 @@ public: /** - * Obtain the number of packets located in the output queue for this - * connection. Note that this number is volatile and should only be - * used as an estimate. Also note that since a queue latching - * strategy is used, this method requires a mutex lock/unlock and is - * therefore not as cheap as may be expected. - * - * @return number of packets currently located in the output queue - * for this connection. - **/ - uint32_t GetQueueLen(); - - - /** * Sync with this connection. When this method is invoked it will * block until all packets currently posted on this connection is * encoded into the output buffer. Also, the amount of data in the diff --git a/fnet/src/vespa/fnet/fnet.h b/fnet/src/vespa/fnet/fnet.h index 5a3a8b28942..c7570e025ec 100644 --- a/fnet/src/vespa/fnet/fnet.h +++ b/fnet/src/vespa/fnet/fnet.h @@ -32,8 +32,6 @@ class FNET_Packet; class FNET_PacketQueue; class FNET_Scheduler; class FNET_SimplePacketStreamer; -class FNET_StatCounters; -class FNET_Stats; class FNET_Task; class FNET_Transport; class FNET_TransportThread; @@ -52,7 +50,6 @@ class FNET_TransportThread; #include "task.h" #include "scheduler.h" #include "config.h" -#include "stats.h" #include "databuffer.h" #include "packet.h" #include "dummypacket.h" diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp index f2dc331c707..b174c3a710e 100644 --- a/fnet/src/vespa/fnet/frt/invoker.cpp +++ b/fnet/src/vespa/fnet/frt/invoker.cpp @@ -64,18 +64,14 @@ FRT_RPCInvoker::FRT_RPCInvoker(FRT_Supervisor *supervisor, req->SetReturnHandler(this); } -bool FRT_RPCInvoker::IsInstant() { - return _method->IsInstant(); -} - -bool FRT_RPCInvoker::Invoke(bool freeChannel) +bool FRT_RPCInvoker::Invoke() { bool detached = false; _req->SetDetachedPT(&detached); (_method->GetHandler()->*_method->GetMethod())(_req); if (detached) return false; - HandleDone(freeChannel); + HandleDone(false); return true; } @@ -120,13 +116,6 @@ FRT_RPCInvoker::GetConnection() return _req->GetContext()._value.CHANNEL->GetConnection(); } - -void -FRT_RPCInvoker::Run(FastOS_ThreadInterface *, void *) -{ - Invoke(true); -} - //----------------------------------------------------------------------------- void FRT_HookInvoker::Invoke() diff --git a/fnet/src/vespa/fnet/frt/invoker.h b/fnet/src/vespa/fnet/frt/invoker.h index 15d74017200..64adf66688e 100644 --- a/fnet/src/vespa/fnet/frt/invoker.h +++ b/fnet/src/vespa/fnet/frt/invoker.h @@ -59,8 +59,7 @@ public: //----------------------------------------------------------------------------- -class FRT_RPCInvoker : public FastOS_Runnable, - public FRT_IReturnHandler +class FRT_RPCInvoker : public FRT_IReturnHandler { private: FRT_RPCRequest *_req; @@ -76,15 +75,13 @@ public: bool noReply); void ForceMethod(FRT_Method *method) { _method = method; } - bool IsInstant(); FRT_RPCRequest *GetRequest() { return _req; } void HandleDone(bool freeChannel); - bool Invoke(bool freeChannel); + bool Invoke(); void HandleReturn() override; FNET_Connection *GetConnection() override; - void Run(FastOS_ThreadInterface *, void *) override; }; //----------------------------------------------------------------------------- diff --git a/fnet/src/vespa/fnet/frt/reflection.cpp b/fnet/src/vespa/fnet/frt/reflection.cpp index 4285c512ebf..305294f4a3c 100644 --- a/fnet/src/vespa/fnet/frt/reflection.cpp +++ b/fnet/src/vespa/fnet/frt/reflection.cpp @@ -6,13 +6,12 @@ #include "supervisor.h" FRT_Method::FRT_Method(const char * name, const char * paramSpec, const char * returnSpec, - bool instant, FRT_METHOD_PT method, FRT_Invokable * handler) + FRT_METHOD_PT method, FRT_Invokable * handler) : _hashNext(nullptr), _listNext(nullptr), _name(strdup(name)), _paramSpec(strdup(paramSpec)), _returnSpec(strdup(returnSpec)), - _instant(instant), _method(method), _handler(handler), _docLen(0), @@ -171,7 +170,6 @@ void FRT_ReflectionBuilder::DefineMethod(const char *name, const char *paramSpec, const char *returnSpec, - bool instant, FRT_METHOD_PT method, FRT_Invokable *handler) { @@ -182,7 +180,6 @@ FRT_ReflectionBuilder::DefineMethod(const char *name, _method = new FRT_Method(name, paramSpec, returnSpec, - instant, method, handler); _lookup->AddMethod(_method); diff --git a/fnet/src/vespa/fnet/frt/reflection.h b/fnet/src/vespa/fnet/frt/reflection.h index 466e58413e9..5189cf81d0a 100644 --- a/fnet/src/vespa/fnet/frt/reflection.h +++ b/fnet/src/vespa/fnet/frt/reflection.h @@ -19,7 +19,6 @@ private: char *_name; // method name char *_paramSpec; // method parameter spec char *_returnSpec; // method return spec - bool _instant; // method is instant ? FRT_METHOD_PT _method; // method pointer FRT_Invokable *_handler; // method handler uint32_t _docLen; // method documentation length @@ -32,7 +31,6 @@ public: FRT_Method(const char *name, const char *paramSpec, const char *returnSpec, - bool instant, FRT_METHOD_PT method, FRT_Invokable *handler); @@ -42,7 +40,6 @@ public: const char *GetName() { return _name; } const char *GetParamSpec() { return _paramSpec; } const char *GetReturnSpec() { return _returnSpec; } - bool IsInstant() { return _instant; } FRT_METHOD_PT GetMethod() { return _method; } FRT_Invokable *GetHandler() { return _handler; } void SetDocumentation(FRT_Values *values); @@ -121,7 +118,6 @@ public: void DefineMethod(const char *name, const char *paramSpec, const char *returnSpec, - bool instant, FRT_METHOD_PT method, FRT_Invokable *handler); void MethodDesc(const char *desc); diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.h b/fnet/src/vespa/fnet/frt/rpcrequest.h index a10653ce2f6..cc871e7ac0c 100644 --- a/fnet/src/vespa/fnet/frt/rpcrequest.h +++ b/fnet/src/vespa/fnet/frt/rpcrequest.h @@ -133,7 +133,7 @@ public: FNET_Packet *CreateReplyPacket(); void SetDetachedPT(bool *detachedPT) { _detachedPT = detachedPT; } - void Detach() { *_detachedPT = true; } + FRT_RPCRequest *Detach() { *_detachedPT = true; return this; } void SetAbortHandler(FRT_IAbortHandler *handler) { _abortHandler = handler; } void SetReturnHandler(FRT_IReturnHandler *handler) { _returnHandler = handler; } diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index 927e2e84b94..e509223c005 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -91,22 +91,6 @@ FRT_Supervisor::GetListenPort() const } -bool -FRT_Supervisor::RunInvocation(FRT_RPCInvoker *invoker) -{ - // XXX: implement queue with max length + max # threads - - if (_threadPool == nullptr || - _threadPool->NewThread(invoker) == nullptr) - { - invoker->GetRequest()->SetError(FRTE_RPC_OVERLOAD, - "Could not start thread"); - return false; - } - return true; -} - - FRT_Target * FRT_Supervisor::GetTarget(const char *spec) { @@ -179,7 +163,7 @@ FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method, { delete _methodMismatchHook; _methodMismatchHook = new FRT_Method("frt.hook.methodMismatch", "*", "*", - true, method, handler); + method, handler); assert(_methodMismatchHook != nullptr); } @@ -284,25 +268,17 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) && _methodMismatchHook != nullptr) { invoker->ForceMethod(_methodMismatchHook); - return (invoker->Invoke(false)) ? + return (invoker->Invoke()) ? FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL; } invoker->HandleDone(false); return FNET_FREE_CHANNEL; - } else if (invoker->IsInstant()) { - - return (invoker->Invoke(false)) ? - FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL; - } else { - if (!RunInvocation(invoker)) { - invoker->HandleDone(false); - return FNET_FREE_CHANNEL; - } - return FNET_CLOSE_CHANNEL; + return (invoker->Invoke()) ? + FNET_FREE_CHANNEL : FNET_CLOSE_CHANNEL; } } @@ -349,17 +325,17 @@ FRT_Supervisor::RPCHooks::InitRPC(FRT_Supervisor *supervisor) { FRT_ReflectionBuilder rb(supervisor); //--------------------------------------------------------------------------- - rb.DefineMethod("frt.rpc.ping", "", "", true, + rb.DefineMethod("frt.rpc.ping", "", "", FRT_METHOD(FRT_Supervisor::RPCHooks::RPC_Ping), this); rb.MethodDesc("Method that may be used to check if the server is online"); //--------------------------------------------------------------------------- - rb.DefineMethod("frt.rpc.echo", "*", "*", true, + rb.DefineMethod("frt.rpc.echo", "*", "*", FRT_METHOD(FRT_Supervisor::RPCHooks::RPC_Echo), this); rb.MethodDesc("Echo the parameters as return values"); rb.ParamDesc("params", "Any set of parameters"); rb.ReturnDesc("return", "The parameter values"); //--------------------------------------------------------------------------- - rb.DefineMethod("frt.rpc.getMethodList", "", "SSS", true, + rb.DefineMethod("frt.rpc.getMethodList", "", "SSS", FRT_METHOD(FRT_Supervisor::RPCHooks::RPC_GetMethodList), this); rb.MethodDesc("Obtain a list of all available methods"); @@ -367,7 +343,7 @@ FRT_Supervisor::RPCHooks::InitRPC(FRT_Supervisor *supervisor) rb.ReturnDesc("params", "Method parameter types"); rb.ReturnDesc("return", "Method return types"); //--------------------------------------------------------------------------- - rb.DefineMethod("frt.rpc.getMethodInfo", "s", "sssSSSS", true, + rb.DefineMethod("frt.rpc.getMethodInfo", "s", "sssSSSS", FRT_METHOD(FRT_Supervisor::RPCHooks::RPC_GetMethodInfo), this); rb.MethodDesc("Obtain detailed information about a single method"); @@ -448,7 +424,7 @@ FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method, { delete _sessionInitHook; _sessionInitHook = new FRT_Method("frt.hook.sessionInit", "", "", - true, method, handler); + method, handler); assert(_sessionInitHook != nullptr); } @@ -459,7 +435,7 @@ FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method, { delete _sessionDownHook; _sessionDownHook = new FRT_Method("frt.hook.sessionDown", "", "", - true, method, handler); + method, handler); assert(_sessionDownHook != nullptr); } @@ -470,7 +446,7 @@ FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method, { delete _sessionFiniHook; _sessionFiniHook = new FRT_Method("frt.hook.sessionFini", "", "", - true, method, handler); + method, handler); assert(_sessionFiniHook != nullptr); } diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h index 051c1caceeb..dc7fb496239 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.h +++ b/fnet/src/vespa/fnet/frt/supervisor.h @@ -99,8 +99,6 @@ public: bool Listen(int port); uint32_t GetListenPort() const; - bool RunInvocation(FRT_RPCInvoker *invoker); - FRT_Target *GetTarget(const char *spec); FRT_Target *Get2WayTarget(const char *spec, FNET_Context connContext = FNET_Context()); diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index 148dabf5c60..d4244cbf204 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -12,7 +12,6 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner, : _ioc_next(nullptr), _ioc_prev(nullptr), _ioc_owner(owner), - _ioc_counters(_ioc_owner->GetStatCounters()), _ioc_socket_fd(socket_fd), _ioc_selector(nullptr), _ioc_spec(nullptr), diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index 16ecce2e345..901c3d1a5d0 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -2,14 +2,12 @@ #pragma once -#include "stats.h" #include <vespa/fastos/timestamp.h> #include <vespa/vespalib/net/selector.h> #include <mutex> #include <condition_variable> class FNET_TransportThread; -class FNET_StatCounters; class FNET_Config; /** @@ -45,7 +43,6 @@ protected: FNET_IOComponent *_ioc_next; // next in list FNET_IOComponent *_ioc_prev; // prev in list FNET_TransportThread *_ioc_owner; // owner(TransportThread) ref. - FNET_StatCounters *_ioc_counters; // stat counters int _ioc_socket_fd; // source of events. Selector *_ioc_selector; // attached event selector char *_ioc_spec; // connect/listen spec @@ -154,47 +151,6 @@ public: **/ void UpdateTimeOut(); - - /** - * Count packet read(s). This is a proxy method updating the stat - * counters associated with the owning transport object. - * - * @param cnt the number of packets read (default is 1). - **/ - void CountPacketRead(uint32_t cnt = 1) - { _ioc_counters->CountPacketRead(cnt); } - - - /** - * Count packet write(s). This is a proxy method updating the stat - * counters associated with the owning transport object. - * - * @param cnt the number of packets written (default is 1). - **/ - void CountPacketWrite(uint32_t cnt = 1) - { _ioc_counters->CountPacketWrite(cnt); } - - - /** - * Count read data. This is a proxy method updating the stat - * counters associated with the owning transport object. - * - * @param bytes the number of bytes read. - **/ - void CountDataRead(uint32_t bytes) - { _ioc_counters->CountDataRead(bytes); } - - - /** - * Count written data. This is a proxy method updating the stat - * counters associated with the owning transport object. - * - * @param bytes the number of bytes written. - **/ - void CountDataWrite(uint32_t bytes) - { _ioc_counters->CountDataWrite(bytes); } - - /** * Attach an event selector to this component. Before deleting an * IOC, one must first call detach_selector to detach the diff --git a/fnet/src/vespa/fnet/stats.cpp b/fnet/src/vespa/fnet/stats.cpp deleted file mode 100644 index f156fe4afe7..00000000000 --- a/fnet/src/vespa/fnet/stats.cpp +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "stats.h" - -#include <vespa/log/log.h> -LOG_SETUP(".fnet"); - -FNET_StatCounters::FNET_StatCounters() - : _eventLoopCnt(0), - _eventCnt(0), - _ioEventCnt(0), - _packetReadCnt(0), - _packetWriteCnt(0), - _dataReadCnt(0), - _dataWriteCnt(0) -{ -} - - -FNET_StatCounters::~FNET_StatCounters() -{ -} - - -void -FNET_StatCounters::Clear() -{ - _eventLoopCnt = 0; - _eventCnt = 0; - _ioEventCnt = 0; - _packetReadCnt = 0; - _packetWriteCnt = 0; - _dataReadCnt = 0; - _dataWriteCnt = 0; -} - -//----------------------------------------------- - -FNET_Stats::FNET_Stats() - : _eventLoopRate(0), - _eventRate(0), - _ioEventRate(0), - _packetReadRate(0), - _packetWriteRate(0), - _dataReadRate(0), - _dataWriteRate(0) -{ -} - - -FNET_Stats::~FNET_Stats() -{ -} - - -void -FNET_Stats::Update(FNET_StatCounters *count, double secs) -{ - _eventLoopRate = (float)(FNET_STATS_OLD_FACTOR * _eventLoopRate - + (FNET_STATS_NEW_FACTOR - * ((double)count->_eventLoopCnt / secs))); - _eventRate = (float)(FNET_STATS_OLD_FACTOR * _eventRate - + (FNET_STATS_NEW_FACTOR - * ((double)count->_eventCnt / secs))); - _ioEventRate = (float)(FNET_STATS_OLD_FACTOR * _ioEventRate - + (FNET_STATS_NEW_FACTOR - * ((double)count->_ioEventCnt / secs))); - - _packetReadRate = (float)(FNET_STATS_OLD_FACTOR * _packetReadRate - + (FNET_STATS_NEW_FACTOR - * ((double)count->_packetReadCnt / secs))); - _packetWriteRate = (float)(FNET_STATS_OLD_FACTOR * _packetWriteRate - + (FNET_STATS_NEW_FACTOR - * ((double)count->_packetWriteCnt / secs))); - - _dataReadRate = (float)(FNET_STATS_OLD_FACTOR * _dataReadRate - + (FNET_STATS_NEW_FACTOR - * ((double)count->_dataReadCnt / (1000.0 * secs)))); - _dataWriteRate = (float)(FNET_STATS_OLD_FACTOR * _dataWriteRate - + (FNET_STATS_NEW_FACTOR - * ((double)count->_dataWriteCnt / (1000.0 * secs)))); -} - - -void -FNET_Stats::Log() -{ - LOG(info, "events[/s][loop/int/io][%.1f/%.1f/%.1f] " - "packets[/s][r/w][%.1f/%.1f] " - "data[kB/s][r/w][%.2f/%.2f]", - _eventLoopRate, - _eventRate, - _ioEventRate, - _packetReadRate, - _packetWriteRate, - _dataReadRate, - _dataWriteRate); -} diff --git a/fnet/src/vespa/fnet/stats.h b/fnet/src/vespa/fnet/stats.h deleted file mode 100644 index 76651393165..00000000000 --- a/fnet/src/vespa/fnet/stats.h +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include <cstdint> - -/** - * This class is used internally by @ref FNET_Transport objects to - * aggregate FNET statistics. The actual statistics are located in the - * @ref FNET_Stats class. - **/ -class FNET_StatCounters -{ -public: - uint32_t _eventLoopCnt; // # event loop iterations - uint32_t _eventCnt; // # internal events - uint32_t _ioEventCnt; // # IO events - uint32_t _packetReadCnt; // # packets read - uint32_t _packetWriteCnt; // # packets written - uint32_t _dataReadCnt; // # bytes read - uint32_t _dataWriteCnt; // # bytes written - - FNET_StatCounters(); - ~FNET_StatCounters(); - - void Clear(); - void CountEventLoop(uint32_t cnt) { _eventLoopCnt += cnt; } - void CountEvent(uint32_t cnt) { _eventCnt += cnt; } - void CountIOEvent(uint32_t cnt) { _ioEventCnt += cnt; } - void CountPacketRead(uint32_t cnt) { _packetReadCnt += cnt; } - void CountPacketWrite(uint32_t cnt) { _packetWriteCnt += cnt; } - void CountDataRead(uint32_t bytes) { _dataReadCnt += bytes; } - void CountDataWrite(uint32_t bytes) { _dataWriteCnt += bytes; } -}; - -//----------------------------------------------- - -#define FNET_STATS_OLD_FACTOR 0.5 -#define FNET_STATS_NEW_FACTOR 0.5 - -/** - * This class contains various FNET statistics. The statistics for a - * @ref FNET_Transport object may be obtained by invoking the GetStats - * method on it. - **/ -class FNET_Stats -{ -public: - /** - * Event loop iterations per second. - **/ - float _eventLoopRate; // loop iterations/s - - /** - * Internal events handled per second. - **/ - float _eventRate; // internal-events/s - - /** - * IO events handled per second. - **/ - float _ioEventRate; // IO-events/s - - /** - * Packets read per second. - **/ - float _packetReadRate; // packets/s - - /** - * Packets written per second. - **/ - float _packetWriteRate; // packets/s - - /** - * Data read per second (in kB). - **/ - float _dataReadRate; // kB/s - - /** - * Data written per second (in kB). - **/ - float _dataWriteRate; // kB/s - - FNET_Stats(); - ~FNET_Stats(); - - /** - * Update statistics. The new statistics are calculated based on - * both the current values and the input count structure indicating - * what has happened since the last statistics update. - * - * @param count what has happened since last statistics update. - * @param secs number of seconds since last statistics update. - **/ - void Update(FNET_StatCounters *count, double secs); - - /** - * Invoking this method will generate a log message of type - * FNET_INFO showing the values held by this object. - **/ - void Log(); -}; - diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index 55c847682c4..dfeb8d03436 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -120,14 +120,6 @@ FNET_Transport::SetTCPNoDelay(bool noDelay) } void -FNET_Transport::SetLogStats(bool logStats) -{ - for (const auto &thread: _threads) { - thread->SetLogStats(logStats); - } -} - -void FNET_Transport::sync() { for (const auto &thread: _threads) { diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index dbf914798fd..15e69bd66a6 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -195,14 +195,6 @@ public: void SetTCPNoDelay(bool noDelay); /** - * Enable or disable logging of FNET statistics. This feature is - * disabled by default. - * - * @param logStats true if stats should be logged. - **/ - void SetLogStats(bool logStats); - - /** * Synchronize with all transport threads. This method will block * until all events posted before this method was invoked has been * processed. If a transport thread has been shut down (or is in diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 34ab9091072..b0388bdc140 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -31,15 +31,6 @@ struct Sync : public FNET_IExecutable } // namespace<unnamed> -#ifndef IAM_DOXYGEN -void -FNET_TransportThread::StatsTask::PerformTask() -{ - _transport->UpdateStats(); - Schedule(5.0); -} -#endif - void FNET_TransportThread::AddComponent(FNET_IOComponent *comp) { @@ -160,22 +151,6 @@ FNET_TransportThread::DiscardEvent(FNET_ControlPacket *cpacket, } -void -FNET_TransportThread::UpdateStats() -{ - _now.SetNow(); // trade some overhead for better stats - double ms = _now.MilliSecs() - _statTime.MilliSecs(); - _statTime = _now; - { - std::lock_guard<std::mutex> guard(_lock); - _stats.Update(&_counters, ms / 1000.0); - } - _counters.Clear(); - - if (_config._logStats) - _stats.Log(); -} - extern "C" { static void pipehandler(int) @@ -203,10 +178,6 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) _startTime(), _now(), _scheduler(&_now), - _counters(), - _stats(), - _statsTask(&_scheduler, this), - _statTime(), _config(), _componentsHead(nullptr), _timeOutHead(nullptr), @@ -424,8 +395,6 @@ FNET_TransportThread::InitEventLoop() } _now.SetNow(); _startTime = _now; - _statTime = _now; - _statsTask.Schedule(5.0); return true; } @@ -435,7 +404,7 @@ FNET_TransportThread::handle_wakeup() { { std::lock_guard<std::mutex> guard(_lock); - CountEvent(_queue.FlushPackets_NoLock(&_myQueue)); + _queue.FlushPackets_NoLock(&_myQueue); } FNET_Context context; @@ -534,7 +503,6 @@ FNET_TransportThread::EventLoopIteration() // obtain I/O events _selector.poll(msTimeout); - CountEventLoop(); // sample current time (performed once per event loop iteration) _now.SetNow(); @@ -548,7 +516,6 @@ FNET_TransportThread::EventLoopIteration() #endif // handle wakeup and io-events - CountIOEvent(_selector.num_events()); _selector.dispatch(*this); // handle IOC time-outs @@ -579,9 +546,6 @@ FNET_TransportThread::EventLoopIteration() if (_finished) return false; - // unschedule statistics task - _statsTask.Kill(); - // flush event queue { std::lock_guard<std::mutex> guard(_lock); diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 3e5fe49e73a..1b8d1fa4eeb 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -6,7 +6,6 @@ #include "config.h" #include "task.h" #include "packetqueue.h" -#include "stats.h" #include <vespa/fastos/thread.h> #include <vespa/fastos/time.h> #include <vespa/vespalib/net/socket_handle.h> @@ -31,31 +30,11 @@ class FNET_TransportThread : public FastOS_Runnable public: using Selector = vespalib::Selector<FNET_IOComponent>; -#ifndef IAM_DOXYGEN - class StatsTask : public FNET_Task - { - private: - FNET_TransportThread *_transport; - StatsTask(const StatsTask &); - StatsTask &operator=(const StatsTask &); - public: - StatsTask(FNET_Scheduler *scheduler, - FNET_TransportThread *transport) : FNET_Task(scheduler), - _transport(transport) {} - void PerformTask() override; - }; - friend class FNET_TransportThread::StatsTask; -#endif // DOXYGEN - private: FNET_Transport &_owner; // owning transport layer FastOS_Time _startTime; // when event loop started FastOS_Time _now; // current time sampler FNET_Scheduler _scheduler; // transport thread scheduler - FNET_StatCounters _counters; // stat counters - FNET_Stats _stats; // current stats - StatsTask _statsTask; // stats task - FastOS_Time _statTime; // last stat update FNET_Config _config; // FNET configuration [static] FNET_IOComponent *_componentsHead; // I/O component list head FNET_IOComponent *_timeOutHead; // first IOC in list to time out @@ -156,49 +135,6 @@ private: /** - * Update internal FNET statistics. This method is called regularly - * by the statistics update task. - **/ - void UpdateStats(); - - - /** - * Obtain a reference to the stat counters used by this transport - * object. - * - * @return stat counters for this transport object. - **/ - FNET_StatCounters *GetStatCounters() { return &_counters; } - - - /** - * Count event loop iteration(s). - * - * @param cnt event loop iterations (default is 1). - **/ - void CountEventLoop(uint32_t cnt = 1) - { _counters.CountEventLoop(cnt); } - - - /** - * Count internal event(s). - * - * @param cnt number of internal events. - **/ - void CountEvent(uint32_t cnt) - { _counters.CountEvent(cnt); } - - - /** - * Count IO events. - * - * @param cnt number of IO events. - **/ - void CountIOEvent(uint32_t cnt) - { _counters.CountIOEvent(cnt); } - - - /** * Obtain a reference to the object holding the configuration for * this transport object. * @@ -355,15 +291,6 @@ public: /** - * Enable or disable logging of FNET statistics. This feature is - * disabled by default. - * - * @param logStats true if stats should be logged. - **/ - void SetLogStats(bool logStats) { _config._logStats = logStats; } - - - /** * Add an I/O component to the working set of this transport * object. Note that the actual work is performed by the transport * thread. This method simply posts an event on the transport thread |