diff options
-rw-r--r-- | fnet/CMakeLists.txt | 1 | ||||
-rw-r--r-- | fnet/src/tests/scheduling/schedule.cpp | 2 | ||||
-rw-r--r-- | fnet/src/tests/scheduling/sloweventloop.cpp | 2 | ||||
-rw-r--r-- | fnet/src/tests/transport_debugger/CMakeLists.txt | 8 | ||||
-rw-r--r-- | fnet/src/tests/transport_debugger/transport_debugger_test.cpp | 121 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/CMakeLists.txt | 1 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/supervisor.cpp | 19 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/supervisor.h | 5 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/scheduler.cpp | 18 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/scheduler.h | 8 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.cpp | 92 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport.h | 56 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_debugger.cpp | 69 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_debugger.h | 60 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 14 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 3 | ||||
-rw-r--r-- | slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp | 2 |
17 files changed, 441 insertions, 40 deletions
diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt index 142c19fca3e..3222b36819d 100644 --- a/fnet/CMakeLists.txt +++ b/fnet/CMakeLists.txt @@ -29,4 +29,5 @@ vespa_define_module( src/tests/sync_execute src/tests/thread_selection src/tests/time + src/tests/transport_debugger ) diff --git a/fnet/src/tests/scheduling/schedule.cpp b/fnet/src/tests/scheduling/schedule.cpp index 55fe4c16398..fcc5c0805d9 100644 --- a/fnet/src/tests/scheduling/schedule.cpp +++ b/fnet/src/tests/scheduling/schedule.cpp @@ -81,7 +81,7 @@ public: TEST("schedule") { _time = steady_time(vespalib::duration::zero()); - _scheduler = new FNET_Scheduler(&_time, &_time); + _scheduler = new FNET_Scheduler(&_time); RealTimeTask rt_task1; RealTimeTask rt_task2; diff --git a/fnet/src/tests/scheduling/sloweventloop.cpp b/fnet/src/tests/scheduling/sloweventloop.cpp index 33070aa29e9..bb07cba6696 100644 --- a/fnet/src/tests/scheduling/sloweventloop.cpp +++ b/fnet/src/tests/scheduling/sloweventloop.cpp @@ -20,7 +20,7 @@ public: TEST("slow event loop") { vespalib::steady_time t(vespalib::duration::zero()); - FNET_Scheduler scheduler(&t, &t); + FNET_Scheduler scheduler(&t); MyTask task(scheduler); MyTask task2(scheduler); diff --git a/fnet/src/tests/transport_debugger/CMakeLists.txt b/fnet/src/tests/transport_debugger/CMakeLists.txt new file mode 100644 index 00000000000..70dcadaf695 --- /dev/null +++ b/fnet/src/tests/transport_debugger/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(fnet_transport_debugger_test_app TEST + SOURCES + transport_debugger_test.cpp + DEPENDS + fnet +) +vespa_add_test(NAME fnet_transport_debugger_test_app COMMAND fnet_transport_debugger_test_app) diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp new file mode 100644 index 00000000000..6c2cfb5cd76 --- /dev/null +++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp @@ -0,0 +1,121 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/testkit/test_kit.h> +#include <vespa/vespalib/testkit/time_bomb.h> +#include <vespa/fnet/transport.h> +#include <vespa/fnet/transport_thread.h> +#include <vespa/fnet/transport_debugger.h> +#include <vespa/fnet/task.h> +#include <vespa/fnet/frt/supervisor.h> +#include <vespa/fnet/frt/rpcrequest.h> +#include <vespa/fnet/frt/invoker.h> +#include <vespa/fnet/frt/target.h> +#include <vespa/vespalib/net/tls/tls_crypto_engine.h> +#include <vespa/vespalib/test/make_tls_options_for_testing.h> + +vespalib::CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing()); + +struct Service : FRT_Invokable { + fnet::frt::StandaloneFRT frt; + Service(fnet::TimeTools::SP time_tools) + : frt(TransportConfig(4).crypto(tls_crypto).time_tools(time_tools)) + { + init_rpc(); + ASSERT_TRUE(frt.supervisor().Listen(0)); + } + FNET_Transport &transport() { return *frt.supervisor().GetTransport(); } + int listen_port() const { + return frt.supervisor().GetListenPort(); + } + FRT_Target *connect(int port) { + return frt.supervisor().GetTarget(port); + } + void init_rpc() { + FRT_ReflectionBuilder rb(&frt.supervisor()); + rb.DefineMethod("inc", "l", "l", FRT_METHOD(Service::rpc_inc), this); + rb.MethodDesc("increment a 64-bit integer, returns after 5 seconds"); + rb.ParamDesc("in", "an integer (64 bit)"); + rb.ReturnDesc("out", "in + 1 (64 bit)"); + } + struct ReturnLater : FNET_Task { + FRT_RPCRequest *req; + ReturnLater(FNET_Scheduler *scheduler, FRT_RPCRequest *req_in) + : FNET_Task(scheduler), req(req_in) {} + void PerformTask() override { req->Return(); } + }; + void rpc_inc(FRT_RPCRequest *req) { + req->Detach(); + FRT_Values ¶ms = *req->GetParams(); + FRT_Values &ret = *req->GetReturn(); + ret.AddInt64(params[0]._intval64 + 1); + auto my_scheduler = req->GetConnection()->Owner()->GetScheduler(); + auto &task = req->getStash().create<ReturnLater>(my_scheduler, req); + task.Schedule(5.0); + } + ~Service() = default; +}; + +struct Fixture { + fnet::TransportDebugger debugger; + Service server; + Service client; + Fixture() + : debugger(), + server(debugger.time_tools()), + client(debugger.time_tools()) + { + debugger.attach({server.transport(), client.transport()}); + } + ~Fixture() { + debugger.detach(); + } +}; + +struct MyWait : FRT_IRequestWait { + FRT_RPCRequest *req = nullptr; + void RequestDone(FRT_RPCRequest *r) override { req = r; } +}; + +TEST_FF("transport layers can be run with transport debugger", Fixture(), vespalib::TimeBomb(60)) { + MyWait w4; // short timeout, should fail + MyWait w6; // long timeout, should be ok + + FRT_Target *target = f1.client.connect(f1.server.listen_port()); + + FRT_RPCRequest *req4 = f1.client.frt.supervisor().AllocRPCRequest(); + req4->SetMethodName("inc"); + req4->GetParams()->AddInt64(3); + target->InvokeAsync(req4, 4.0, &w4); + + FRT_RPCRequest *req6 = f1.client.frt.supervisor().AllocRPCRequest(); + req6->SetMethodName("inc"); + req6->GetParams()->AddInt64(7); + target->InvokeAsync(req6, 6.0, &w6); + + bool got4 = false; + bool got6 = false; + size_t steps = 0; + + while (!(got4 && got6)) { + f1.debugger.step(); + ++steps; + if (!got4 && w4.req) { + got4 = true; + fprintf(stderr, "request with 4s timeout completed after %zu steps (~%zu ms)\n", steps, steps * 5); + } + if (!got6 && w6.req) { + got6 = true; + fprintf(stderr, "request with 6s timeout completed after %zu steps (~%zu ms)\n", steps, steps * 5); + } + } + ASSERT_EQUAL(req4, w4.req); + ASSERT_EQUAL(req6, w6.req); + EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT); + ASSERT_TRUE(req6->CheckReturnTypes("l")); + EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u); + target->SubRef(); + req4->SubRef(); + req6->SubRef(); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/vespa/fnet/CMakeLists.txt b/fnet/src/vespa/fnet/CMakeLists.txt index 4b9d818e5ed..8accc83e90a 100644 --- a/fnet/src/vespa/fnet/CMakeLists.txt +++ b/fnet/src/vespa/fnet/CMakeLists.txt @@ -19,6 +19,7 @@ vespa_add_library(fnet simplepacketstreamer.cpp task.cpp transport.cpp + transport_debugger.cpp transport_thread.cpp $<TARGET_OBJECTS:fnet_frt> INSTALL lib64 diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index d992567f776..3264e80ae7b 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -8,6 +8,7 @@ #include <vespa/fnet/transport_thread.h> #include <vespa/fnet/connector.h> #include <vespa/fastos/thread.h> +#include <vespa/vespalib/util/require.h> FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport) : _transport(transport), @@ -409,21 +410,19 @@ FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_threa namespace fnet::frt { -StandaloneFRT::StandaloneFRT() +StandaloneFRT::StandaloneFRT(const TransportConfig &config) : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)), - _transport(std::make_unique<FNET_Transport>()), + _transport(std::make_unique<FNET_Transport>(config)), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())) { - _transport->Start(_threadPool.get()); + REQUIRE(_transport->Start(_threadPool.get())); } -StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto) - : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)), - _transport(std::make_unique<FNET_Transport>(TransportConfig().crypto(std::move(crypto)))), - _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())) -{ - _transport->Start(_threadPool.get()); -} +StandaloneFRT::StandaloneFRT() + : StandaloneFRT(TransportConfig()) {} + +StandaloneFRT::StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto) + : StandaloneFRT(TransportConfig().crypto(std::move(crypto))) {} StandaloneFRT::~StandaloneFRT() { diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h index 2743cafae26..079fac9c7e9 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.h +++ b/fnet/src/vespa/fnet/frt/supervisor.h @@ -10,6 +10,7 @@ #include <vespa/fnet/connection.h> #include <vespa/fnet/simplepacketstreamer.h> +class TransportConfig; class FNET_Transport; class FRT_Target; class FastOS_ThreadPool; @@ -129,10 +130,12 @@ namespace fnet::frt { */ class StandaloneFRT { public: + explicit StandaloneFRT(const TransportConfig &config); StandaloneFRT(); explicit StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto); ~StandaloneFRT(); - FRT_Supervisor & supervisor() { return *_supervisor; } + FRT_Supervisor &supervisor() { return *_supervisor; } + const FRT_Supervisor &supervisor() const { return *_supervisor; } void shutdown(); private: std::unique_ptr<FastOS_ThreadPool> _threadPool; diff --git a/fnet/src/vespa/fnet/scheduler.cpp b/fnet/src/vespa/fnet/scheduler.cpp index f54d3d6a9bb..9327c52c308 100644 --- a/fnet/src/vespa/fnet/scheduler.cpp +++ b/fnet/src/vespa/fnet/scheduler.cpp @@ -9,8 +9,7 @@ LOG_SETUP(".fnet.scheduler"); -FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler, - vespalib::steady_time *now) +FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler) : _cond(), _next(), _now(), @@ -25,13 +24,8 @@ FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler, for (int i = 0; i < NUM_SLOTS; i++) _slots[i] = nullptr; _slots[NUM_SLOTS] = nullptr; - - if (now != nullptr) { - _next = *now; - } else { - _next = vespalib::steady_clock::now(); - } - _next += tick_ms; + _now = _sampler ? *_sampler : vespalib::steady_clock::now(); + _next = _now + tick_ms; } @@ -143,11 +137,7 @@ FNET_Scheduler::Print(FILE *dst) void FNET_Scheduler::CheckTasks() { - if (_sampler != nullptr) { - _now = *_sampler; - } else { - _now = vespalib::steady_clock::now(); - } + _now = _sampler ? *_sampler : vespalib::steady_clock::now(); // assume timely value propagation diff --git a/fnet/src/vespa/fnet/scheduler.h b/fnet/src/vespa/fnet/scheduler.h index 98a9442799e..e5494f229fa 100644 --- a/fnet/src/vespa/fnet/scheduler.h +++ b/fnet/src/vespa/fnet/scheduler.h @@ -68,12 +68,10 @@ public: * @param sampler if given, this object will be used to obtain the * time when the @ref CheckTasks method is invoked. If a * sampler is not given, time sampling will be - * handled internally. - * @param now if given, indicates the current time. This value is - * used by the constructor to init internal variables. + * handled internally. The sampler will also be used by + * the constructor to init internal variables. **/ - FNET_Scheduler(vespalib::steady_time *sampler = nullptr, - vespalib::steady_time *now = nullptr); + FNET_Scheduler(vespalib::steady_time *sampler = nullptr); virtual ~FNET_Scheduler(); diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index 81d35f9852c..a7ec883b550 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -5,6 +5,7 @@ #include "iocomponent.h" #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/size_literals.h> +#include <vespa/vespalib/util/rendezvous.h> #include <chrono> #include <xxhash.h> @@ -24,12 +25,83 @@ struct HashState { VESPA_THREAD_STACK_TAG(fnet_work_pool); +struct DefaultTimeTools : fnet::TimeTools { + vespalib::duration event_timeout() const override { + return FNET_Scheduler::tick_ms; + } + vespalib::steady_time current_time() const override { + return vespalib::steady_clock::now(); + } +}; + +struct DebugTimeTools : fnet::TimeTools { + vespalib::duration my_event_timeout; + std::function<vespalib::steady_time()> my_current_time; + DebugTimeTools(vespalib::duration d, std::function<vespalib::steady_time()> f) noexcept + : my_event_timeout(d), my_current_time(std::move(f)) {} + vespalib::duration event_timeout() const override { + return my_event_timeout; + } + vespalib::steady_time current_time() const override { + return my_current_time(); + } +}; + +struct CaptureMeet : vespalib::Rendezvous<int,bool> { + using SP = std::shared_ptr<CaptureMeet>; + vespalib::SyncableThreadExecutor &work_pool; + vespalib::AsyncResolver &async_resolver; + std::function<bool()> capture_hook; + CaptureMeet(size_t N, + vespalib::SyncableThreadExecutor &work_pool_in, + vespalib::AsyncResolver &resolver_in, + std::function<bool()> capture_hook_in) + : vespalib::Rendezvous<int,bool>(N), + work_pool(work_pool_in), + async_resolver(resolver_in), + capture_hook(std::move(capture_hook_in)) {} + void mingle() override { + work_pool.sync(); + async_resolver.wait_for_pending_resolves(); + bool result = capture_hook(); + for (size_t i = 0; i < size(); ++i) { + out(i) = result; + } + } +}; + +struct CaptureTask : FNET_Task { + CaptureMeet::SP meet; + CaptureTask(FNET_Scheduler *scheduler, CaptureMeet::SP meet_in) + : FNET_Task(scheduler), meet(std::move(meet_in)) {} + void PerformTask() override { + int dummy_value = 0; // rendezvous must have input value + if (meet->rendezvous(dummy_value)) { + ScheduleNow(); + } else { + delete this; + } + }; +}; + } // namespace <unnamed> +namespace fnet { + +TimeTools::SP +TimeTools::make_debug(vespalib::duration event_timeout, + std::function<vespalib::steady_time()> current_time) +{ + return std::make_shared<DebugTimeTools>(event_timeout, std::move(current_time)); +} + +} // fnet + TransportConfig::TransportConfig(int num_threads) : _config(), _resolver(), _crypto(), + _time_tools(), _num_threads(num_threads) {} @@ -39,14 +111,21 @@ vespalib::AsyncResolver::SP TransportConfig::resolver() const { return _resolver ? _resolver : vespalib::AsyncResolver::get_shared(); } + vespalib::CryptoEngine::SP TransportConfig::crypto() const { return _crypto ? _crypto : vespalib::CryptoEngine::get_default(); } -FNET_Transport::FNET_Transport(TransportConfig cfg) +fnet::TimeTools::SP +TransportConfig::time_tools() const { + return _time_tools ? _time_tools : std::make_shared<DefaultTimeTools>(); +} + +FNET_Transport::FNET_Transport(const TransportConfig &cfg) : _async_resolver(cfg.resolver()), _crypto_engine(cfg.crypto()), + _time_tools(cfg.time_tools()), _work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128_Ki, fnet_work_pool, 1024)), _threads(), _config(cfg.config()) @@ -175,6 +254,17 @@ FNET_Transport::Start(FastOS_ThreadPool *pool) } void +FNET_Transport::attach_capture_hook(std::function<bool()> capture_hook) +{ + auto meet = std::make_shared<CaptureMeet>(_threads.size(), *_work_pool, *_async_resolver, std::move(capture_hook)); + for (auto &thread: _threads) { + // tasks will be deleted when the capture_hook returns false + auto *task = new CaptureTask(thread->GetScheduler(), meet); + task->ScheduleNow(); + } +} + +void FNET_Transport::Add(FNET_IOComponent *comp, bool needRef) { comp->Owner()->Add(comp, needRef); } diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index 6a59f9da66c..117bf6c93b8 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -8,6 +8,7 @@ #include <vector> #include <vespa/vespalib/net/async_resolver.h> #include <vespa/vespalib/net/crypto_engine.h> +#include <vespa/vespalib/util/time.h> class FNET_TransportThread; class FastOS_ThreadPool; @@ -17,6 +18,29 @@ class FNET_IServerAdapter; class FNET_IPacketHandler; class FNET_Scheduler; +namespace fnet { + +/** + * Low-level abstraction for event-loop time management. The + * event_timeout function returns the timeout to be used when waiting + * for io-events. The current_time function returns the current + * time. This interface may be implemented to control both how time is + * spent (event_timeout) as well as how time is observed + * (current_time). The default implementation will use + * FNET_Scheduler::tick_ms as event timeout and + * vespalib::steady_clock::now() as current time. + **/ +struct TimeTools { + using SP = std::shared_ptr<TimeTools>; + virtual vespalib::duration event_timeout() const = 0; + virtual vespalib::steady_time current_time() const = 0; + virtual ~TimeTools() = default; + static TimeTools::SP make_debug(vespalib::duration event_timeout, + std::function<vespalib::steady_time()> current_time); +}; + +} // fnet + class TransportConfig { public: TransportConfig() : TransportConfig(1) {} @@ -24,6 +48,7 @@ public: ~TransportConfig(); vespalib::AsyncResolver::SP resolver() const; vespalib::CryptoEngine::SP crypto() const; + fnet::TimeTools::SP time_tools() const; TransportConfig & resolver(vespalib::AsyncResolver::SP resolver_in) { _resolver = std::move(resolver_in); return *this; @@ -32,6 +57,11 @@ public: _crypto = std::move(crypto_in); return *this; } + TransportConfig &time_tools(fnet::TimeTools::SP time_tools_in) { + _time_tools = std::move(time_tools_in); + return *this; + } + const FNET_Config & config() const { return _config; } uint32_t num_threads() const { return _num_threads; } @@ -62,8 +92,10 @@ private: FNET_Config _config; vespalib::AsyncResolver::SP _resolver; vespalib::CryptoEngine::SP _crypto; + fnet::TimeTools::SP _time_tools; uint32_t _num_threads; }; + /** * This class represents the transport layer and handles a collection * of transport threads. Note: remember to shut down your transport @@ -77,6 +109,7 @@ private: vespalib::AsyncResolver::SP _async_resolver; vespalib::CryptoEngine::SP _crypto_engine; + fnet::TimeTools::SP _time_tools; std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool; Threads _threads; const FNET_Config _config; @@ -91,7 +124,7 @@ public: * the current thread become the transport thread. Main may only * be called for single-threaded transports. **/ - explicit FNET_Transport(TransportConfig config); + explicit FNET_Transport(const TransportConfig &config); explicit FNET_Transport(uint32_t num_threads) : FNET_Transport(TransportConfig(num_threads)) {} @@ -100,6 +133,7 @@ public: ~FNET_Transport(); const FNET_Config & getConfig() const { return _config; } + const fnet::TimeTools &time_tools() const { return *_time_tools; } /** * Try to execute the given task on the internal work pool @@ -284,6 +318,26 @@ public: **/ bool Start(FastOS_ThreadPool *pool); + /** + * Capture transport threads. Used for testing purposes, + * preferably combined with a debug variant of TimeTools. + * + * After this function is called, the capture_hook will be called + * repeatedly as long as it returns true. The first time it + * returns false, appropriate cleanup will be performed and the + * capture_hook will never be called again; it detaches + * itself. All transport threads will be blocked while the + * capture_hook is called. Between calls to the capture_hook each + * transport thread will run its event loop exactly once, all + * pending work in the work pool will be performed and all pending + * dns lookups will be performed. Note that the capture_hook + * should detach itself by returning false before the transport + * itself is shut down. + * + * @param capture_hook called until it returns false + **/ + void attach_capture_hook(std::function<bool()> capture_hook); + //------------------------------------------------------------------------- // forward async IO Component operations to their owners //------------------------------------------------------------------------- diff --git a/fnet/src/vespa/fnet/transport_debugger.cpp b/fnet/src/vespa/fnet/transport_debugger.cpp new file mode 100644 index 00000000000..1878b921171 --- /dev/null +++ b/fnet/src/vespa/fnet/transport_debugger.cpp @@ -0,0 +1,69 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "transport_debugger.h" +#include <vespa/vespalib/util/require.h> +#include <cassert> + +namespace fnet { + +void +TransportDebugger::Meet::mingle() +{ + bool call_again = true; + for (size_t i = 0; i < size(); ++i) { + if (!in(i)) { + call_again = false; + } + } + for (size_t i = 0; i < size(); ++i) { + out(i) = call_again; + } +} + +TransportDebugger::TransportDebugger() + : _time(), + _meet() +{ +} + +TransportDebugger::~TransportDebugger() +{ + assert(!_meet && "error: still attached"); +} + +void +TransportDebugger::attach(std::initializer_list<std::reference_wrapper<FNET_Transport> > list) +{ + size_t N = list.size() + 1; + REQUIRE(!_meet); + REQUIRE(N > 1); + _meet = std::make_shared<Meet>(N); + for (auto &item: list) { + item.get().attach_capture_hook([meet = _meet]() + { + REQUIRE(meet->rendezvous(true)); + // capture point: between meetings + return meet->rendezvous(true); + }); + } + REQUIRE(_meet->rendezvous(true)); // capture transport threads +} + +void +TransportDebugger::step() +{ + REQUIRE(_meet); + _time += 5ms; // pretend 5ms passes between each event loop iteration + REQUIRE(_meet->rendezvous(true)); // release transport threads + REQUIRE(_meet->rendezvous(true)); // capture transport threads +} + +void +TransportDebugger::detach() +{ + REQUIRE(_meet); + REQUIRE(!_meet->rendezvous(false)); // release transport threads (final time) + _meet.reset(); +} + +} diff --git a/fnet/src/vespa/fnet/transport_debugger.h b/fnet/src/vespa/fnet/transport_debugger.h new file mode 100644 index 00000000000..ed3738bb9fe --- /dev/null +++ b/fnet/src/vespa/fnet/transport_debugger.h @@ -0,0 +1,60 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "transport.h" +#include <vespa/vespalib/util/rendezvous.h> +#include <vespa/vespalib/util/time.h> +#include <memory> + +namespace fnet { + +/** + * This class is used to control transport threads during unit + * testing. + * + * The TimeTools created by this class should be used when setting up + * all transports used in the test. The supplied TimeTools will make + * sure no thread ever blocks waiting for io-events and also make sure + * all threads observe the same externally controlled current + * time. After the transport layers are started, the attach function + * is used to start controlling event loop execution. While attached, + * calling the step function will run each transport thread event loop + * exactly once (in parallel), wait for pending dns resolving, wait + * for pending tls handshake work and advance the current time with + * 5ms (making sure 'time passes' and 'stuff happens' at a reasonable + * relative rate). It is important to call detach to release the + * transports before trying to shut them down. + * + * Note that both server and client should be controlled by the same + * debugger when testing rpc. Using external services will result in + * (synthetic) time passing too fast compared to stuff actually + * happening, since you do not control the other end-point in the same + * way. + * + * Take a look at the unit test for this class for an example of how + * to use it. + **/ +class TransportDebugger +{ +private: + struct Meet : vespalib::Rendezvous<bool,bool> { + Meet(size_t N) : vespalib::Rendezvous<bool,bool>(N) {} + void mingle() override; + }; + vespalib::steady_time _time; + std::shared_ptr<Meet> _meet; + +public: + TransportDebugger(); + ~TransportDebugger(); + vespalib::steady_time time() const { return _time; } + TimeTools::SP time_tools() { + return TimeTools::make_debug(vespalib::duration::zero(), [this]() noexcept { return time(); }); + } + void attach(std::initializer_list<std::reference_wrapper<FNET_Transport> > list); + void step(); + void detach(); +}; + +} diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 5e1a9759a60..760c951fedd 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -207,7 +207,7 @@ extern "C" { FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) : _owner(owner_in), - _now(steady_clock ::now()), + _now(owner_in.time_tools().current_time()), _scheduler(&_now), _componentsHead(nullptr), _timeOutHead(nullptr), @@ -246,6 +246,12 @@ FNET_TransportThread::getConfig() const { return _owner.getConfig(); } +const fnet::TimeTools & +FNET_TransportThread::time_tools() const +{ + return _owner.time_tools(); +} + bool FNET_TransportThread::tune(SocketHandle &handle) const { @@ -388,7 +394,7 @@ FNET_TransportThread::InitEventLoop() LOG(error, "Transport: InitEventLoop: object already active!"); return false; } - _now = steady_clock::now(); + _now = time_tools().current_time(); return true; } @@ -465,12 +471,12 @@ bool FNET_TransportThread::EventLoopIteration() { if (!IsShutDown()) { - int msTimeout = FNET_Scheduler::tick_ms.count(); + int msTimeout = vespalib::count_ms(time_tools().event_timeout()); // obtain I/O events _selector.poll(msTimeout); // sample current time (performed once per event loop iteration) - _now = steady_clock::now(); + _now = time_tools().current_time(); // handle io-events auto dispatchResult = _selector.dispatch(*this); diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 371544f9c8a..3c00a3fe93b 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -14,6 +14,7 @@ #include <condition_variable> #include <chrono> +namespace fnet { struct TimeTools; } class FNET_Transport; class FNET_ControlPacket; class FNET_IPacketStreamer; @@ -138,7 +139,7 @@ private: * @return config object. **/ const FNET_Config & getConfig() const; - + const fnet::TimeTools &time_tools() const; void handle_add_cmd(FNET_IOComponent *ioc); void handle_close_cmd(FNET_IOComponent *ioc); diff --git a/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp b/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp index 46f6837c4f1..8624e38c787 100644 --- a/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp +++ b/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp @@ -104,7 +104,7 @@ struct LocalRpcMonitorMapTest : public ::testing::Test { ServiceMapping mapping_conflict; LocalRpcMonitorMapTest() : time(duration::zero()), - scheduler(&time, &time), monitor_log(), map_log(), + scheduler(&time), monitor_log(), map_log(), map(&scheduler, [this](auto &owner) { EXPECT_EQ(&owner, &map); |