aboutsummaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2021-09-14 09:50:51 +0000
committerHåvard Pettersen <havardpe@oath.com>2021-09-16 11:17:11 +0000
commit851e46a8912ece85870c9780a65dcc314ba3d539 (patch)
tree8affe3a41258f61e1f111ffabfd1ac8ef85a0f91 /fnet
parent20dc2c895ab6cbefb9a02c9d1fdb8a2b543f1867 (diff)
low-level testing of fnet enabled by
1. Manipulating how much time is spent waiting for events while also manipulating how the passing of time is observed by the transport threads. (fnet::TimeTools) 2. Periodic call-backs from FNET_Transport. All transport threads are blocked during these call-backs. Between each call-back, all transport threads perform a single event loop iteration and any async operations not performed in transport threads are flushed. (FNET_Transport::attach_capture_hook) 3. Combining 1 and 2 in a way that lets the test code run all transport threads one iteration at a time, letting the transport threads run concurrently with each other, but no transport thread is allowed to run concurrently with the test code. (fnet::TransportDebugger)
Diffstat (limited to 'fnet')
-rw-r--r--fnet/CMakeLists.txt1
-rw-r--r--fnet/src/tests/scheduling/schedule.cpp2
-rw-r--r--fnet/src/tests/scheduling/sloweventloop.cpp2
-rw-r--r--fnet/src/tests/transport_debugger/CMakeLists.txt8
-rw-r--r--fnet/src/tests/transport_debugger/transport_debugger_test.cpp121
-rw-r--r--fnet/src/vespa/fnet/CMakeLists.txt1
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp19
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.h5
-rw-r--r--fnet/src/vespa/fnet/scheduler.cpp18
-rw-r--r--fnet/src/vespa/fnet/scheduler.h8
-rw-r--r--fnet/src/vespa/fnet/transport.cpp92
-rw-r--r--fnet/src/vespa/fnet/transport.h56
-rw-r--r--fnet/src/vespa/fnet/transport_debugger.cpp69
-rw-r--r--fnet/src/vespa/fnet/transport_debugger.h60
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp14
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h3
16 files changed, 440 insertions, 39 deletions
diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt
index 142c19fca3e..3222b36819d 100644
--- a/fnet/CMakeLists.txt
+++ b/fnet/CMakeLists.txt
@@ -29,4 +29,5 @@ vespa_define_module(
src/tests/sync_execute
src/tests/thread_selection
src/tests/time
+ src/tests/transport_debugger
)
diff --git a/fnet/src/tests/scheduling/schedule.cpp b/fnet/src/tests/scheduling/schedule.cpp
index 55fe4c16398..fcc5c0805d9 100644
--- a/fnet/src/tests/scheduling/schedule.cpp
+++ b/fnet/src/tests/scheduling/schedule.cpp
@@ -81,7 +81,7 @@ public:
TEST("schedule") {
_time = steady_time(vespalib::duration::zero());
- _scheduler = new FNET_Scheduler(&_time, &_time);
+ _scheduler = new FNET_Scheduler(&_time);
RealTimeTask rt_task1;
RealTimeTask rt_task2;
diff --git a/fnet/src/tests/scheduling/sloweventloop.cpp b/fnet/src/tests/scheduling/sloweventloop.cpp
index 33070aa29e9..bb07cba6696 100644
--- a/fnet/src/tests/scheduling/sloweventloop.cpp
+++ b/fnet/src/tests/scheduling/sloweventloop.cpp
@@ -20,7 +20,7 @@ public:
TEST("slow event loop") {
vespalib::steady_time t(vespalib::duration::zero());
- FNET_Scheduler scheduler(&t, &t);
+ FNET_Scheduler scheduler(&t);
MyTask task(scheduler);
MyTask task2(scheduler);
diff --git a/fnet/src/tests/transport_debugger/CMakeLists.txt b/fnet/src/tests/transport_debugger/CMakeLists.txt
new file mode 100644
index 00000000000..70dcadaf695
--- /dev/null
+++ b/fnet/src/tests/transport_debugger/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(fnet_transport_debugger_test_app TEST
+ SOURCES
+ transport_debugger_test.cpp
+ DEPENDS
+ fnet
+)
+vespa_add_test(NAME fnet_transport_debugger_test_app COMMAND fnet_transport_debugger_test_app)
diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
new file mode 100644
index 00000000000..6c2cfb5cd76
--- /dev/null
+++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
@@ -0,0 +1,121 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/testkit/time_bomb.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/transport_thread.h>
+#include <vespa/fnet/transport_debugger.h>
+#include <vespa/fnet/task.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/invoker.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/vespalib/net/tls/tls_crypto_engine.h>
+#include <vespa/vespalib/test/make_tls_options_for_testing.h>
+
+vespalib::CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing());
+
+struct Service : FRT_Invokable {
+ fnet::frt::StandaloneFRT frt;
+ Service(fnet::TimeTools::SP time_tools)
+ : frt(TransportConfig(4).crypto(tls_crypto).time_tools(time_tools))
+ {
+ init_rpc();
+ ASSERT_TRUE(frt.supervisor().Listen(0));
+ }
+ FNET_Transport &transport() { return *frt.supervisor().GetTransport(); }
+ int listen_port() const {
+ return frt.supervisor().GetListenPort();
+ }
+ FRT_Target *connect(int port) {
+ return frt.supervisor().GetTarget(port);
+ }
+ void init_rpc() {
+ FRT_ReflectionBuilder rb(&frt.supervisor());
+ rb.DefineMethod("inc", "l", "l", FRT_METHOD(Service::rpc_inc), this);
+ rb.MethodDesc("increment a 64-bit integer, returns after 5 seconds");
+ rb.ParamDesc("in", "an integer (64 bit)");
+ rb.ReturnDesc("out", "in + 1 (64 bit)");
+ }
+ struct ReturnLater : FNET_Task {
+ FRT_RPCRequest *req;
+ ReturnLater(FNET_Scheduler *scheduler, FRT_RPCRequest *req_in)
+ : FNET_Task(scheduler), req(req_in) {}
+ void PerformTask() override { req->Return(); }
+ };
+ void rpc_inc(FRT_RPCRequest *req) {
+ req->Detach();
+ FRT_Values &params = *req->GetParams();
+ FRT_Values &ret = *req->GetReturn();
+ ret.AddInt64(params[0]._intval64 + 1);
+ auto my_scheduler = req->GetConnection()->Owner()->GetScheduler();
+ auto &task = req->getStash().create<ReturnLater>(my_scheduler, req);
+ task.Schedule(5.0);
+ }
+ ~Service() = default;
+};
+
+struct Fixture {
+ fnet::TransportDebugger debugger;
+ Service server;
+ Service client;
+ Fixture()
+ : debugger(),
+ server(debugger.time_tools()),
+ client(debugger.time_tools())
+ {
+ debugger.attach({server.transport(), client.transport()});
+ }
+ ~Fixture() {
+ debugger.detach();
+ }
+};
+
+struct MyWait : FRT_IRequestWait {
+ FRT_RPCRequest *req = nullptr;
+ void RequestDone(FRT_RPCRequest *r) override { req = r; }
+};
+
+TEST_FF("transport layers can be run with transport debugger", Fixture(), vespalib::TimeBomb(60)) {
+ MyWait w4; // short timeout, should fail
+ MyWait w6; // long timeout, should be ok
+
+ FRT_Target *target = f1.client.connect(f1.server.listen_port());
+
+ FRT_RPCRequest *req4 = f1.client.frt.supervisor().AllocRPCRequest();
+ req4->SetMethodName("inc");
+ req4->GetParams()->AddInt64(3);
+ target->InvokeAsync(req4, 4.0, &w4);
+
+ FRT_RPCRequest *req6 = f1.client.frt.supervisor().AllocRPCRequest();
+ req6->SetMethodName("inc");
+ req6->GetParams()->AddInt64(7);
+ target->InvokeAsync(req6, 6.0, &w6);
+
+ bool got4 = false;
+ bool got6 = false;
+ size_t steps = 0;
+
+ while (!(got4 && got6)) {
+ f1.debugger.step();
+ ++steps;
+ if (!got4 && w4.req) {
+ got4 = true;
+ fprintf(stderr, "request with 4s timeout completed after %zu steps (~%zu ms)\n", steps, steps * 5);
+ }
+ if (!got6 && w6.req) {
+ got6 = true;
+ fprintf(stderr, "request with 6s timeout completed after %zu steps (~%zu ms)\n", steps, steps * 5);
+ }
+ }
+ ASSERT_EQUAL(req4, w4.req);
+ ASSERT_EQUAL(req6, w6.req);
+ EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT);
+ ASSERT_TRUE(req6->CheckReturnTypes("l"));
+ EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u);
+ target->SubRef();
+ req4->SubRef();
+ req6->SubRef();
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/CMakeLists.txt b/fnet/src/vespa/fnet/CMakeLists.txt
index 4b9d818e5ed..8accc83e90a 100644
--- a/fnet/src/vespa/fnet/CMakeLists.txt
+++ b/fnet/src/vespa/fnet/CMakeLists.txt
@@ -19,6 +19,7 @@ vespa_add_library(fnet
simplepacketstreamer.cpp
task.cpp
transport.cpp
+ transport_debugger.cpp
transport_thread.cpp
$<TARGET_OBJECTS:fnet_frt>
INSTALL lib64
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index d992567f776..3264e80ae7b 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -8,6 +8,7 @@
#include <vespa/fnet/transport_thread.h>
#include <vespa/fnet/connector.h>
#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/require.h>
FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport)
: _transport(transport),
@@ -409,21 +410,19 @@ FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_threa
namespace fnet::frt {
-StandaloneFRT::StandaloneFRT()
+StandaloneFRT::StandaloneFRT(const TransportConfig &config)
: _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)),
- _transport(std::make_unique<FNET_Transport>()),
+ _transport(std::make_unique<FNET_Transport>(config)),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
{
- _transport->Start(_threadPool.get());
+ REQUIRE(_transport->Start(_threadPool.get()));
}
-StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto)
- : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)),
- _transport(std::make_unique<FNET_Transport>(TransportConfig().crypto(std::move(crypto)))),
- _supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
-{
- _transport->Start(_threadPool.get());
-}
+StandaloneFRT::StandaloneFRT()
+ : StandaloneFRT(TransportConfig()) {}
+
+StandaloneFRT::StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto)
+ : StandaloneFRT(TransportConfig().crypto(std::move(crypto))) {}
StandaloneFRT::~StandaloneFRT()
{
diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h
index 2743cafae26..079fac9c7e9 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.h
+++ b/fnet/src/vespa/fnet/frt/supervisor.h
@@ -10,6 +10,7 @@
#include <vespa/fnet/connection.h>
#include <vespa/fnet/simplepacketstreamer.h>
+class TransportConfig;
class FNET_Transport;
class FRT_Target;
class FastOS_ThreadPool;
@@ -129,10 +130,12 @@ namespace fnet::frt {
*/
class StandaloneFRT {
public:
+ explicit StandaloneFRT(const TransportConfig &config);
StandaloneFRT();
explicit StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto);
~StandaloneFRT();
- FRT_Supervisor & supervisor() { return *_supervisor; }
+ FRT_Supervisor &supervisor() { return *_supervisor; }
+ const FRT_Supervisor &supervisor() const { return *_supervisor; }
void shutdown();
private:
std::unique_ptr<FastOS_ThreadPool> _threadPool;
diff --git a/fnet/src/vespa/fnet/scheduler.cpp b/fnet/src/vespa/fnet/scheduler.cpp
index f54d3d6a9bb..9327c52c308 100644
--- a/fnet/src/vespa/fnet/scheduler.cpp
+++ b/fnet/src/vespa/fnet/scheduler.cpp
@@ -9,8 +9,7 @@
LOG_SETUP(".fnet.scheduler");
-FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler,
- vespalib::steady_time *now)
+FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler)
: _cond(),
_next(),
_now(),
@@ -25,13 +24,8 @@ FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler,
for (int i = 0; i < NUM_SLOTS; i++)
_slots[i] = nullptr;
_slots[NUM_SLOTS] = nullptr;
-
- if (now != nullptr) {
- _next = *now;
- } else {
- _next = vespalib::steady_clock::now();
- }
- _next += tick_ms;
+ _now = _sampler ? *_sampler : vespalib::steady_clock::now();
+ _next = _now + tick_ms;
}
@@ -143,11 +137,7 @@ FNET_Scheduler::Print(FILE *dst)
void
FNET_Scheduler::CheckTasks()
{
- if (_sampler != nullptr) {
- _now = *_sampler;
- } else {
- _now = vespalib::steady_clock::now();
- }
+ _now = _sampler ? *_sampler : vespalib::steady_clock::now();
// assume timely value propagation
diff --git a/fnet/src/vespa/fnet/scheduler.h b/fnet/src/vespa/fnet/scheduler.h
index 98a9442799e..e5494f229fa 100644
--- a/fnet/src/vespa/fnet/scheduler.h
+++ b/fnet/src/vespa/fnet/scheduler.h
@@ -68,12 +68,10 @@ public:
* @param sampler if given, this object will be used to obtain the
* time when the @ref CheckTasks method is invoked. If a
* sampler is not given, time sampling will be
- * handled internally.
- * @param now if given, indicates the current time. This value is
- * used by the constructor to init internal variables.
+ * handled internally. The sampler will also be used by
+ * the constructor to init internal variables.
**/
- FNET_Scheduler(vespalib::steady_time *sampler = nullptr,
- vespalib::steady_time *now = nullptr);
+ FNET_Scheduler(vespalib::steady_time *sampler = nullptr);
virtual ~FNET_Scheduler();
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index 81d35f9852c..a7ec883b550 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -5,6 +5,7 @@
#include "iocomponent.h"
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/rendezvous.h>
#include <chrono>
#include <xxhash.h>
@@ -24,12 +25,83 @@ struct HashState {
VESPA_THREAD_STACK_TAG(fnet_work_pool);
+struct DefaultTimeTools : fnet::TimeTools {
+ vespalib::duration event_timeout() const override {
+ return FNET_Scheduler::tick_ms;
+ }
+ vespalib::steady_time current_time() const override {
+ return vespalib::steady_clock::now();
+ }
+};
+
+struct DebugTimeTools : fnet::TimeTools {
+ vespalib::duration my_event_timeout;
+ std::function<vespalib::steady_time()> my_current_time;
+ DebugTimeTools(vespalib::duration d, std::function<vespalib::steady_time()> f) noexcept
+ : my_event_timeout(d), my_current_time(std::move(f)) {}
+ vespalib::duration event_timeout() const override {
+ return my_event_timeout;
+ }
+ vespalib::steady_time current_time() const override {
+ return my_current_time();
+ }
+};
+
+struct CaptureMeet : vespalib::Rendezvous<int,bool> {
+ using SP = std::shared_ptr<CaptureMeet>;
+ vespalib::SyncableThreadExecutor &work_pool;
+ vespalib::AsyncResolver &async_resolver;
+ std::function<bool()> capture_hook;
+ CaptureMeet(size_t N,
+ vespalib::SyncableThreadExecutor &work_pool_in,
+ vespalib::AsyncResolver &resolver_in,
+ std::function<bool()> capture_hook_in)
+ : vespalib::Rendezvous<int,bool>(N),
+ work_pool(work_pool_in),
+ async_resolver(resolver_in),
+ capture_hook(std::move(capture_hook_in)) {}
+ void mingle() override {
+ work_pool.sync();
+ async_resolver.wait_for_pending_resolves();
+ bool result = capture_hook();
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = result;
+ }
+ }
+};
+
+struct CaptureTask : FNET_Task {
+ CaptureMeet::SP meet;
+ CaptureTask(FNET_Scheduler *scheduler, CaptureMeet::SP meet_in)
+ : FNET_Task(scheduler), meet(std::move(meet_in)) {}
+ void PerformTask() override {
+ int dummy_value = 0; // rendezvous must have input value
+ if (meet->rendezvous(dummy_value)) {
+ ScheduleNow();
+ } else {
+ delete this;
+ }
+ };
+};
+
} // namespace <unnamed>
+namespace fnet {
+
+TimeTools::SP
+TimeTools::make_debug(vespalib::duration event_timeout,
+ std::function<vespalib::steady_time()> current_time)
+{
+ return std::make_shared<DebugTimeTools>(event_timeout, std::move(current_time));
+}
+
+} // fnet
+
TransportConfig::TransportConfig(int num_threads)
: _config(),
_resolver(),
_crypto(),
+ _time_tools(),
_num_threads(num_threads)
{}
@@ -39,14 +111,21 @@ vespalib::AsyncResolver::SP
TransportConfig::resolver() const {
return _resolver ? _resolver : vespalib::AsyncResolver::get_shared();
}
+
vespalib::CryptoEngine::SP
TransportConfig::crypto() const {
return _crypto ? _crypto : vespalib::CryptoEngine::get_default();
}
-FNET_Transport::FNET_Transport(TransportConfig cfg)
+fnet::TimeTools::SP
+TransportConfig::time_tools() const {
+ return _time_tools ? _time_tools : std::make_shared<DefaultTimeTools>();
+}
+
+FNET_Transport::FNET_Transport(const TransportConfig &cfg)
: _async_resolver(cfg.resolver()),
_crypto_engine(cfg.crypto()),
+ _time_tools(cfg.time_tools()),
_work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128_Ki, fnet_work_pool, 1024)),
_threads(),
_config(cfg.config())
@@ -175,6 +254,17 @@ FNET_Transport::Start(FastOS_ThreadPool *pool)
}
void
+FNET_Transport::attach_capture_hook(std::function<bool()> capture_hook)
+{
+ auto meet = std::make_shared<CaptureMeet>(_threads.size(), *_work_pool, *_async_resolver, std::move(capture_hook));
+ for (auto &thread: _threads) {
+ // tasks will be deleted when the capture_hook returns false
+ auto *task = new CaptureTask(thread->GetScheduler(), meet);
+ task->ScheduleNow();
+ }
+}
+
+void
FNET_Transport::Add(FNET_IOComponent *comp, bool needRef) {
comp->Owner()->Add(comp, needRef);
}
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 6a59f9da66c..117bf6c93b8 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -8,6 +8,7 @@
#include <vector>
#include <vespa/vespalib/net/async_resolver.h>
#include <vespa/vespalib/net/crypto_engine.h>
+#include <vespa/vespalib/util/time.h>
class FNET_TransportThread;
class FastOS_ThreadPool;
@@ -17,6 +18,29 @@ class FNET_IServerAdapter;
class FNET_IPacketHandler;
class FNET_Scheduler;
+namespace fnet {
+
+/**
+ * Low-level abstraction for event-loop time management. The
+ * event_timeout function returns the timeout to be used when waiting
+ * for io-events. The current_time function returns the current
+ * time. This interface may be implemented to control both how time is
+ * spent (event_timeout) as well as how time is observed
+ * (current_time). The default implementation will use
+ * FNET_Scheduler::tick_ms as event timeout and
+ * vespalib::steady_clock::now() as current time.
+ **/
+struct TimeTools {
+ using SP = std::shared_ptr<TimeTools>;
+ virtual vespalib::duration event_timeout() const = 0;
+ virtual vespalib::steady_time current_time() const = 0;
+ virtual ~TimeTools() = default;
+ static TimeTools::SP make_debug(vespalib::duration event_timeout,
+ std::function<vespalib::steady_time()> current_time);
+};
+
+} // fnet
+
class TransportConfig {
public:
TransportConfig() : TransportConfig(1) {}
@@ -24,6 +48,7 @@ public:
~TransportConfig();
vespalib::AsyncResolver::SP resolver() const;
vespalib::CryptoEngine::SP crypto() const;
+ fnet::TimeTools::SP time_tools() const;
TransportConfig & resolver(vespalib::AsyncResolver::SP resolver_in) {
_resolver = std::move(resolver_in);
return *this;
@@ -32,6 +57,11 @@ public:
_crypto = std::move(crypto_in);
return *this;
}
+ TransportConfig &time_tools(fnet::TimeTools::SP time_tools_in) {
+ _time_tools = std::move(time_tools_in);
+ return *this;
+ }
+
const FNET_Config & config() const { return _config; }
uint32_t num_threads() const { return _num_threads; }
@@ -62,8 +92,10 @@ private:
FNET_Config _config;
vespalib::AsyncResolver::SP _resolver;
vespalib::CryptoEngine::SP _crypto;
+ fnet::TimeTools::SP _time_tools;
uint32_t _num_threads;
};
+
/**
* This class represents the transport layer and handles a collection
* of transport threads. Note: remember to shut down your transport
@@ -77,6 +109,7 @@ private:
vespalib::AsyncResolver::SP _async_resolver;
vespalib::CryptoEngine::SP _crypto_engine;
+ fnet::TimeTools::SP _time_tools;
std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool;
Threads _threads;
const FNET_Config _config;
@@ -91,7 +124,7 @@ public:
* the current thread become the transport thread. Main may only
* be called for single-threaded transports.
**/
- explicit FNET_Transport(TransportConfig config);
+ explicit FNET_Transport(const TransportConfig &config);
explicit FNET_Transport(uint32_t num_threads)
: FNET_Transport(TransportConfig(num_threads)) {}
@@ -100,6 +133,7 @@ public:
~FNET_Transport();
const FNET_Config & getConfig() const { return _config; }
+ const fnet::TimeTools &time_tools() const { return *_time_tools; }
/**
* Try to execute the given task on the internal work pool
@@ -284,6 +318,26 @@ public:
**/
bool Start(FastOS_ThreadPool *pool);
+ /**
+ * Capture transport threads. Used for testing purposes,
+ * preferably combined with a debug variant of TimeTools.
+ *
+ * After this function is called, the capture_hook will be called
+ * repeatedly as long as it returns true. The first time it
+ * returns false, appropriate cleanup will be performed and the
+ * capture_hook will never be called again; it detaches
+ * itself. All transport threads will be blocked while the
+ * capture_hook is called. Between calls to the capture_hook each
+ * transport thread will run its event loop exactly once, all
+ * pending work in the work pool will be performed and all pending
+ * dns lookups will be performed. Note that the capture_hook
+ * should detach itself by returning false before the transport
+ * itself is shut down.
+ *
+ * @param capture_hook called until it returns false
+ **/
+ void attach_capture_hook(std::function<bool()> capture_hook);
+
//-------------------------------------------------------------------------
// forward async IO Component operations to their owners
//-------------------------------------------------------------------------
diff --git a/fnet/src/vespa/fnet/transport_debugger.cpp b/fnet/src/vespa/fnet/transport_debugger.cpp
new file mode 100644
index 00000000000..1878b921171
--- /dev/null
+++ b/fnet/src/vespa/fnet/transport_debugger.cpp
@@ -0,0 +1,69 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "transport_debugger.h"
+#include <vespa/vespalib/util/require.h>
+#include <cassert>
+
+namespace fnet {
+
+void
+TransportDebugger::Meet::mingle()
+{
+ bool call_again = true;
+ for (size_t i = 0; i < size(); ++i) {
+ if (!in(i)) {
+ call_again = false;
+ }
+ }
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = call_again;
+ }
+}
+
+TransportDebugger::TransportDebugger()
+ : _time(),
+ _meet()
+{
+}
+
+TransportDebugger::~TransportDebugger()
+{
+ assert(!_meet && "error: still attached");
+}
+
+void
+TransportDebugger::attach(std::initializer_list<std::reference_wrapper<FNET_Transport> > list)
+{
+ size_t N = list.size() + 1;
+ REQUIRE(!_meet);
+ REQUIRE(N > 1);
+ _meet = std::make_shared<Meet>(N);
+ for (auto &item: list) {
+ item.get().attach_capture_hook([meet = _meet]()
+ {
+ REQUIRE(meet->rendezvous(true));
+ // capture point: between meetings
+ return meet->rendezvous(true);
+ });
+ }
+ REQUIRE(_meet->rendezvous(true)); // capture transport threads
+}
+
+void
+TransportDebugger::step()
+{
+ REQUIRE(_meet);
+ _time += 5ms; // pretend 5ms passes between each event loop iteration
+ REQUIRE(_meet->rendezvous(true)); // release transport threads
+ REQUIRE(_meet->rendezvous(true)); // capture transport threads
+}
+
+void
+TransportDebugger::detach()
+{
+ REQUIRE(_meet);
+ REQUIRE(!_meet->rendezvous(false)); // release transport threads (final time)
+ _meet.reset();
+}
+
+}
diff --git a/fnet/src/vespa/fnet/transport_debugger.h b/fnet/src/vespa/fnet/transport_debugger.h
new file mode 100644
index 00000000000..ed3738bb9fe
--- /dev/null
+++ b/fnet/src/vespa/fnet/transport_debugger.h
@@ -0,0 +1,60 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "transport.h"
+#include <vespa/vespalib/util/rendezvous.h>
+#include <vespa/vespalib/util/time.h>
+#include <memory>
+
+namespace fnet {
+
+/**
+ * This class is used to control transport threads during unit
+ * testing.
+ *
+ * The TimeTools created by this class should be used when setting up
+ * all transports used in the test. The supplied TimeTools will make
+ * sure no thread ever blocks waiting for io-events and also make sure
+ * all threads observe the same externally controlled current
+ * time. After the transport layers are started, the attach function
+ * is used to start controlling event loop execution. While attached,
+ * calling the step function will run each transport thread event loop
+ * exactly once (in parallel), wait for pending dns resolving, wait
+ * for pending tls handshake work and advance the current time with
+ * 5ms (making sure 'time passes' and 'stuff happens' at a reasonable
+ * relative rate). It is important to call detach to release the
+ * transports before trying to shut them down.
+ *
+ * Note that both server and client should be controlled by the same
+ * debugger when testing rpc. Using external services will result in
+ * (synthetic) time passing too fast compared to stuff actually
+ * happening, since you do not control the other end-point in the same
+ * way.
+ *
+ * Take a look at the unit test for this class for an example of how
+ * to use it.
+ **/
+class TransportDebugger
+{
+private:
+ struct Meet : vespalib::Rendezvous<bool,bool> {
+ Meet(size_t N) : vespalib::Rendezvous<bool,bool>(N) {}
+ void mingle() override;
+ };
+ vespalib::steady_time _time;
+ std::shared_ptr<Meet> _meet;
+
+public:
+ TransportDebugger();
+ ~TransportDebugger();
+ vespalib::steady_time time() const { return _time; }
+ TimeTools::SP time_tools() {
+ return TimeTools::make_debug(vespalib::duration::zero(), [this]() noexcept { return time(); });
+ }
+ void attach(std::initializer_list<std::reference_wrapper<FNET_Transport> > list);
+ void step();
+ void detach();
+};
+
+}
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 5e1a9759a60..760c951fedd 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -207,7 +207,7 @@ extern "C" {
FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
: _owner(owner_in),
- _now(steady_clock ::now()),
+ _now(owner_in.time_tools().current_time()),
_scheduler(&_now),
_componentsHead(nullptr),
_timeOutHead(nullptr),
@@ -246,6 +246,12 @@ FNET_TransportThread::getConfig() const {
return _owner.getConfig();
}
+const fnet::TimeTools &
+FNET_TransportThread::time_tools() const
+{
+ return _owner.time_tools();
+}
+
bool
FNET_TransportThread::tune(SocketHandle &handle) const
{
@@ -388,7 +394,7 @@ FNET_TransportThread::InitEventLoop()
LOG(error, "Transport: InitEventLoop: object already active!");
return false;
}
- _now = steady_clock::now();
+ _now = time_tools().current_time();
return true;
}
@@ -465,12 +471,12 @@ bool
FNET_TransportThread::EventLoopIteration() {
if (!IsShutDown()) {
- int msTimeout = FNET_Scheduler::tick_ms.count();
+ int msTimeout = vespalib::count_ms(time_tools().event_timeout());
// obtain I/O events
_selector.poll(msTimeout);
// sample current time (performed once per event loop iteration)
- _now = steady_clock::now();
+ _now = time_tools().current_time();
// handle io-events
auto dispatchResult = _selector.dispatch(*this);
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 371544f9c8a..3c00a3fe93b 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -14,6 +14,7 @@
#include <condition_variable>
#include <chrono>
+namespace fnet { struct TimeTools; }
class FNET_Transport;
class FNET_ControlPacket;
class FNET_IPacketStreamer;
@@ -138,7 +139,7 @@ private:
* @return config object.
**/
const FNET_Config & getConfig() const;
-
+ const fnet::TimeTools &time_tools() const;
void handle_add_cmd(FNET_IOComponent *ioc);
void handle_close_cmd(FNET_IOComponent *ioc);