summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fnet/CMakeLists.txt1
-rw-r--r--fnet/src/tests/scheduling/schedule.cpp2
-rw-r--r--fnet/src/tests/scheduling/sloweventloop.cpp2
-rw-r--r--fnet/src/tests/transport_debugger/CMakeLists.txt8
-rw-r--r--fnet/src/tests/transport_debugger/transport_debugger_test.cpp121
-rw-r--r--fnet/src/vespa/fnet/CMakeLists.txt1
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.cpp19
-rw-r--r--fnet/src/vespa/fnet/frt/supervisor.h5
-rw-r--r--fnet/src/vespa/fnet/scheduler.cpp18
-rw-r--r--fnet/src/vespa/fnet/scheduler.h8
-rw-r--r--fnet/src/vespa/fnet/transport.cpp92
-rw-r--r--fnet/src/vespa/fnet/transport.h56
-rw-r--r--fnet/src/vespa/fnet/transport_debugger.cpp69
-rw-r--r--fnet/src/vespa/fnet/transport_debugger.h60
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp14
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h3
-rw-r--r--slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp2
17 files changed, 441 insertions, 40 deletions
diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt
index 142c19fca3e..3222b36819d 100644
--- a/fnet/CMakeLists.txt
+++ b/fnet/CMakeLists.txt
@@ -29,4 +29,5 @@ vespa_define_module(
src/tests/sync_execute
src/tests/thread_selection
src/tests/time
+ src/tests/transport_debugger
)
diff --git a/fnet/src/tests/scheduling/schedule.cpp b/fnet/src/tests/scheduling/schedule.cpp
index 55fe4c16398..fcc5c0805d9 100644
--- a/fnet/src/tests/scheduling/schedule.cpp
+++ b/fnet/src/tests/scheduling/schedule.cpp
@@ -81,7 +81,7 @@ public:
TEST("schedule") {
_time = steady_time(vespalib::duration::zero());
- _scheduler = new FNET_Scheduler(&_time, &_time);
+ _scheduler = new FNET_Scheduler(&_time);
RealTimeTask rt_task1;
RealTimeTask rt_task2;
diff --git a/fnet/src/tests/scheduling/sloweventloop.cpp b/fnet/src/tests/scheduling/sloweventloop.cpp
index 33070aa29e9..bb07cba6696 100644
--- a/fnet/src/tests/scheduling/sloweventloop.cpp
+++ b/fnet/src/tests/scheduling/sloweventloop.cpp
@@ -20,7 +20,7 @@ public:
TEST("slow event loop") {
vespalib::steady_time t(vespalib::duration::zero());
- FNET_Scheduler scheduler(&t, &t);
+ FNET_Scheduler scheduler(&t);
MyTask task(scheduler);
MyTask task2(scheduler);
diff --git a/fnet/src/tests/transport_debugger/CMakeLists.txt b/fnet/src/tests/transport_debugger/CMakeLists.txt
new file mode 100644
index 00000000000..70dcadaf695
--- /dev/null
+++ b/fnet/src/tests/transport_debugger/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(fnet_transport_debugger_test_app TEST
+ SOURCES
+ transport_debugger_test.cpp
+ DEPENDS
+ fnet
+)
+vespa_add_test(NAME fnet_transport_debugger_test_app COMMAND fnet_transport_debugger_test_app)
diff --git a/fnet/src/tests/transport_debugger/transport_debugger_test.cpp b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
new file mode 100644
index 00000000000..6c2cfb5cd76
--- /dev/null
+++ b/fnet/src/tests/transport_debugger/transport_debugger_test.cpp
@@ -0,0 +1,121 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/testkit/time_bomb.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/transport_thread.h>
+#include <vespa/fnet/transport_debugger.h>
+#include <vespa/fnet/task.h>
+#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/fnet/frt/rpcrequest.h>
+#include <vespa/fnet/frt/invoker.h>
+#include <vespa/fnet/frt/target.h>
+#include <vespa/vespalib/net/tls/tls_crypto_engine.h>
+#include <vespa/vespalib/test/make_tls_options_for_testing.h>
+
+vespalib::CryptoEngine::SP tls_crypto = std::make_shared<vespalib::TlsCryptoEngine>(vespalib::test::make_tls_options_for_testing());
+
+struct Service : FRT_Invokable {
+ fnet::frt::StandaloneFRT frt;
+ Service(fnet::TimeTools::SP time_tools)
+ : frt(TransportConfig(4).crypto(tls_crypto).time_tools(time_tools))
+ {
+ init_rpc();
+ ASSERT_TRUE(frt.supervisor().Listen(0));
+ }
+ FNET_Transport &transport() { return *frt.supervisor().GetTransport(); }
+ int listen_port() const {
+ return frt.supervisor().GetListenPort();
+ }
+ FRT_Target *connect(int port) {
+ return frt.supervisor().GetTarget(port);
+ }
+ void init_rpc() {
+ FRT_ReflectionBuilder rb(&frt.supervisor());
+ rb.DefineMethod("inc", "l", "l", FRT_METHOD(Service::rpc_inc), this);
+ rb.MethodDesc("increment a 64-bit integer, returns after 5 seconds");
+ rb.ParamDesc("in", "an integer (64 bit)");
+ rb.ReturnDesc("out", "in + 1 (64 bit)");
+ }
+ struct ReturnLater : FNET_Task {
+ FRT_RPCRequest *req;
+ ReturnLater(FNET_Scheduler *scheduler, FRT_RPCRequest *req_in)
+ : FNET_Task(scheduler), req(req_in) {}
+ void PerformTask() override { req->Return(); }
+ };
+ void rpc_inc(FRT_RPCRequest *req) {
+ req->Detach();
+ FRT_Values &params = *req->GetParams();
+ FRT_Values &ret = *req->GetReturn();
+ ret.AddInt64(params[0]._intval64 + 1);
+ auto my_scheduler = req->GetConnection()->Owner()->GetScheduler();
+ auto &task = req->getStash().create<ReturnLater>(my_scheduler, req);
+ task.Schedule(5.0);
+ }
+ ~Service() = default;
+};
+
+struct Fixture {
+ fnet::TransportDebugger debugger;
+ Service server;
+ Service client;
+ Fixture()
+ : debugger(),
+ server(debugger.time_tools()),
+ client(debugger.time_tools())
+ {
+ debugger.attach({server.transport(), client.transport()});
+ }
+ ~Fixture() {
+ debugger.detach();
+ }
+};
+
+struct MyWait : FRT_IRequestWait {
+ FRT_RPCRequest *req = nullptr;
+ void RequestDone(FRT_RPCRequest *r) override { req = r; }
+};
+
+TEST_FF("transport layers can be run with transport debugger", Fixture(), vespalib::TimeBomb(60)) {
+ MyWait w4; // short timeout, should fail
+ MyWait w6; // long timeout, should be ok
+
+ FRT_Target *target = f1.client.connect(f1.server.listen_port());
+
+ FRT_RPCRequest *req4 = f1.client.frt.supervisor().AllocRPCRequest();
+ req4->SetMethodName("inc");
+ req4->GetParams()->AddInt64(3);
+ target->InvokeAsync(req4, 4.0, &w4);
+
+ FRT_RPCRequest *req6 = f1.client.frt.supervisor().AllocRPCRequest();
+ req6->SetMethodName("inc");
+ req6->GetParams()->AddInt64(7);
+ target->InvokeAsync(req6, 6.0, &w6);
+
+ bool got4 = false;
+ bool got6 = false;
+ size_t steps = 0;
+
+ while (!(got4 && got6)) {
+ f1.debugger.step();
+ ++steps;
+ if (!got4 && w4.req) {
+ got4 = true;
+ fprintf(stderr, "request with 4s timeout completed after %zu steps (~%zu ms)\n", steps, steps * 5);
+ }
+ if (!got6 && w6.req) {
+ got6 = true;
+ fprintf(stderr, "request with 6s timeout completed after %zu steps (~%zu ms)\n", steps, steps * 5);
+ }
+ }
+ ASSERT_EQUAL(req4, w4.req);
+ ASSERT_EQUAL(req6, w6.req);
+ EXPECT_EQUAL(req4->GetErrorCode(), FRTE_RPC_TIMEOUT);
+ ASSERT_TRUE(req6->CheckReturnTypes("l"));
+ EXPECT_EQUAL(req6->GetReturn()->GetValue(0)._intval64, 8u);
+ target->SubRef();
+ req4->SubRef();
+ req6->SubRef();
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/CMakeLists.txt b/fnet/src/vespa/fnet/CMakeLists.txt
index 4b9d818e5ed..8accc83e90a 100644
--- a/fnet/src/vespa/fnet/CMakeLists.txt
+++ b/fnet/src/vespa/fnet/CMakeLists.txt
@@ -19,6 +19,7 @@ vespa_add_library(fnet
simplepacketstreamer.cpp
task.cpp
transport.cpp
+ transport_debugger.cpp
transport_thread.cpp
$<TARGET_OBJECTS:fnet_frt>
INSTALL lib64
diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp
index d992567f776..3264e80ae7b 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.cpp
+++ b/fnet/src/vespa/fnet/frt/supervisor.cpp
@@ -8,6 +8,7 @@
#include <vespa/fnet/transport_thread.h>
#include <vespa/fnet/connector.h>
#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/require.h>
FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport)
: _transport(transport),
@@ -409,21 +410,19 @@ FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_threa
namespace fnet::frt {
-StandaloneFRT::StandaloneFRT()
+StandaloneFRT::StandaloneFRT(const TransportConfig &config)
: _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)),
- _transport(std::make_unique<FNET_Transport>()),
+ _transport(std::make_unique<FNET_Transport>(config)),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
{
- _transport->Start(_threadPool.get());
+ REQUIRE(_transport->Start(_threadPool.get()));
}
-StandaloneFRT::StandaloneFRT(vespalib::CryptoEngine::SP crypto)
- : _threadPool(std::make_unique<FastOS_ThreadPool>(1024*128)),
- _transport(std::make_unique<FNET_Transport>(TransportConfig().crypto(std::move(crypto)))),
- _supervisor(std::make_unique<FRT_Supervisor>(_transport.get()))
-{
- _transport->Start(_threadPool.get());
-}
+StandaloneFRT::StandaloneFRT()
+ : StandaloneFRT(TransportConfig()) {}
+
+StandaloneFRT::StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto)
+ : StandaloneFRT(TransportConfig().crypto(std::move(crypto))) {}
StandaloneFRT::~StandaloneFRT()
{
diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h
index 2743cafae26..079fac9c7e9 100644
--- a/fnet/src/vespa/fnet/frt/supervisor.h
+++ b/fnet/src/vespa/fnet/frt/supervisor.h
@@ -10,6 +10,7 @@
#include <vespa/fnet/connection.h>
#include <vespa/fnet/simplepacketstreamer.h>
+class TransportConfig;
class FNET_Transport;
class FRT_Target;
class FastOS_ThreadPool;
@@ -129,10 +130,12 @@ namespace fnet::frt {
*/
class StandaloneFRT {
public:
+ explicit StandaloneFRT(const TransportConfig &config);
StandaloneFRT();
explicit StandaloneFRT(std::shared_ptr<vespalib::CryptoEngine> crypto);
~StandaloneFRT();
- FRT_Supervisor & supervisor() { return *_supervisor; }
+ FRT_Supervisor &supervisor() { return *_supervisor; }
+ const FRT_Supervisor &supervisor() const { return *_supervisor; }
void shutdown();
private:
std::unique_ptr<FastOS_ThreadPool> _threadPool;
diff --git a/fnet/src/vespa/fnet/scheduler.cpp b/fnet/src/vespa/fnet/scheduler.cpp
index f54d3d6a9bb..9327c52c308 100644
--- a/fnet/src/vespa/fnet/scheduler.cpp
+++ b/fnet/src/vespa/fnet/scheduler.cpp
@@ -9,8 +9,7 @@
LOG_SETUP(".fnet.scheduler");
-FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler,
- vespalib::steady_time *now)
+FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler)
: _cond(),
_next(),
_now(),
@@ -25,13 +24,8 @@ FNET_Scheduler::FNET_Scheduler(vespalib::steady_time *sampler,
for (int i = 0; i < NUM_SLOTS; i++)
_slots[i] = nullptr;
_slots[NUM_SLOTS] = nullptr;
-
- if (now != nullptr) {
- _next = *now;
- } else {
- _next = vespalib::steady_clock::now();
- }
- _next += tick_ms;
+ _now = _sampler ? *_sampler : vespalib::steady_clock::now();
+ _next = _now + tick_ms;
}
@@ -143,11 +137,7 @@ FNET_Scheduler::Print(FILE *dst)
void
FNET_Scheduler::CheckTasks()
{
- if (_sampler != nullptr) {
- _now = *_sampler;
- } else {
- _now = vespalib::steady_clock::now();
- }
+ _now = _sampler ? *_sampler : vespalib::steady_clock::now();
// assume timely value propagation
diff --git a/fnet/src/vespa/fnet/scheduler.h b/fnet/src/vespa/fnet/scheduler.h
index 98a9442799e..e5494f229fa 100644
--- a/fnet/src/vespa/fnet/scheduler.h
+++ b/fnet/src/vespa/fnet/scheduler.h
@@ -68,12 +68,10 @@ public:
* @param sampler if given, this object will be used to obtain the
* time when the @ref CheckTasks method is invoked. If a
* sampler is not given, time sampling will be
- * handled internally.
- * @param now if given, indicates the current time. This value is
- * used by the constructor to init internal variables.
+ * handled internally. The sampler will also be used by
+ * the constructor to init internal variables.
**/
- FNET_Scheduler(vespalib::steady_time *sampler = nullptr,
- vespalib::steady_time *now = nullptr);
+ FNET_Scheduler(vespalib::steady_time *sampler = nullptr);
virtual ~FNET_Scheduler();
diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp
index 81d35f9852c..a7ec883b550 100644
--- a/fnet/src/vespa/fnet/transport.cpp
+++ b/fnet/src/vespa/fnet/transport.cpp
@@ -5,6 +5,7 @@
#include "iocomponent.h"
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/size_literals.h>
+#include <vespa/vespalib/util/rendezvous.h>
#include <chrono>
#include <xxhash.h>
@@ -24,12 +25,83 @@ struct HashState {
VESPA_THREAD_STACK_TAG(fnet_work_pool);
+struct DefaultTimeTools : fnet::TimeTools {
+ vespalib::duration event_timeout() const override {
+ return FNET_Scheduler::tick_ms;
+ }
+ vespalib::steady_time current_time() const override {
+ return vespalib::steady_clock::now();
+ }
+};
+
+struct DebugTimeTools : fnet::TimeTools {
+ vespalib::duration my_event_timeout;
+ std::function<vespalib::steady_time()> my_current_time;
+ DebugTimeTools(vespalib::duration d, std::function<vespalib::steady_time()> f) noexcept
+ : my_event_timeout(d), my_current_time(std::move(f)) {}
+ vespalib::duration event_timeout() const override {
+ return my_event_timeout;
+ }
+ vespalib::steady_time current_time() const override {
+ return my_current_time();
+ }
+};
+
+struct CaptureMeet : vespalib::Rendezvous<int,bool> {
+ using SP = std::shared_ptr<CaptureMeet>;
+ vespalib::SyncableThreadExecutor &work_pool;
+ vespalib::AsyncResolver &async_resolver;
+ std::function<bool()> capture_hook;
+ CaptureMeet(size_t N,
+ vespalib::SyncableThreadExecutor &work_pool_in,
+ vespalib::AsyncResolver &resolver_in,
+ std::function<bool()> capture_hook_in)
+ : vespalib::Rendezvous<int,bool>(N),
+ work_pool(work_pool_in),
+ async_resolver(resolver_in),
+ capture_hook(std::move(capture_hook_in)) {}
+ void mingle() override {
+ work_pool.sync();
+ async_resolver.wait_for_pending_resolves();
+ bool result = capture_hook();
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = result;
+ }
+ }
+};
+
+struct CaptureTask : FNET_Task {
+ CaptureMeet::SP meet;
+ CaptureTask(FNET_Scheduler *scheduler, CaptureMeet::SP meet_in)
+ : FNET_Task(scheduler), meet(std::move(meet_in)) {}
+ void PerformTask() override {
+ int dummy_value = 0; // rendezvous must have input value
+ if (meet->rendezvous(dummy_value)) {
+ ScheduleNow();
+ } else {
+ delete this;
+ }
+ };
+};
+
} // namespace <unnamed>
+namespace fnet {
+
+TimeTools::SP
+TimeTools::make_debug(vespalib::duration event_timeout,
+ std::function<vespalib::steady_time()> current_time)
+{
+ return std::make_shared<DebugTimeTools>(event_timeout, std::move(current_time));
+}
+
+} // fnet
+
TransportConfig::TransportConfig(int num_threads)
: _config(),
_resolver(),
_crypto(),
+ _time_tools(),
_num_threads(num_threads)
{}
@@ -39,14 +111,21 @@ vespalib::AsyncResolver::SP
TransportConfig::resolver() const {
return _resolver ? _resolver : vespalib::AsyncResolver::get_shared();
}
+
vespalib::CryptoEngine::SP
TransportConfig::crypto() const {
return _crypto ? _crypto : vespalib::CryptoEngine::get_default();
}
-FNET_Transport::FNET_Transport(TransportConfig cfg)
+fnet::TimeTools::SP
+TransportConfig::time_tools() const {
+ return _time_tools ? _time_tools : std::make_shared<DefaultTimeTools>();
+}
+
+FNET_Transport::FNET_Transport(const TransportConfig &cfg)
: _async_resolver(cfg.resolver()),
_crypto_engine(cfg.crypto()),
+ _time_tools(cfg.time_tools()),
_work_pool(std::make_unique<vespalib::ThreadStackExecutor>(1, 128_Ki, fnet_work_pool, 1024)),
_threads(),
_config(cfg.config())
@@ -175,6 +254,17 @@ FNET_Transport::Start(FastOS_ThreadPool *pool)
}
void
+FNET_Transport::attach_capture_hook(std::function<bool()> capture_hook)
+{
+ auto meet = std::make_shared<CaptureMeet>(_threads.size(), *_work_pool, *_async_resolver, std::move(capture_hook));
+ for (auto &thread: _threads) {
+ // tasks will be deleted when the capture_hook returns false
+ auto *task = new CaptureTask(thread->GetScheduler(), meet);
+ task->ScheduleNow();
+ }
+}
+
+void
FNET_Transport::Add(FNET_IOComponent *comp, bool needRef) {
comp->Owner()->Add(comp, needRef);
}
diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h
index 6a59f9da66c..117bf6c93b8 100644
--- a/fnet/src/vespa/fnet/transport.h
+++ b/fnet/src/vespa/fnet/transport.h
@@ -8,6 +8,7 @@
#include <vector>
#include <vespa/vespalib/net/async_resolver.h>
#include <vespa/vespalib/net/crypto_engine.h>
+#include <vespa/vespalib/util/time.h>
class FNET_TransportThread;
class FastOS_ThreadPool;
@@ -17,6 +18,29 @@ class FNET_IServerAdapter;
class FNET_IPacketHandler;
class FNET_Scheduler;
+namespace fnet {
+
+/**
+ * Low-level abstraction for event-loop time management. The
+ * event_timeout function returns the timeout to be used when waiting
+ * for io-events. The current_time function returns the current
+ * time. This interface may be implemented to control both how time is
+ * spent (event_timeout) as well as how time is observed
+ * (current_time). The default implementation will use
+ * FNET_Scheduler::tick_ms as event timeout and
+ * vespalib::steady_clock::now() as current time.
+ **/
+struct TimeTools {
+ using SP = std::shared_ptr<TimeTools>;
+ virtual vespalib::duration event_timeout() const = 0;
+ virtual vespalib::steady_time current_time() const = 0;
+ virtual ~TimeTools() = default;
+ static TimeTools::SP make_debug(vespalib::duration event_timeout,
+ std::function<vespalib::steady_time()> current_time);
+};
+
+} // fnet
+
class TransportConfig {
public:
TransportConfig() : TransportConfig(1) {}
@@ -24,6 +48,7 @@ public:
~TransportConfig();
vespalib::AsyncResolver::SP resolver() const;
vespalib::CryptoEngine::SP crypto() const;
+ fnet::TimeTools::SP time_tools() const;
TransportConfig & resolver(vespalib::AsyncResolver::SP resolver_in) {
_resolver = std::move(resolver_in);
return *this;
@@ -32,6 +57,11 @@ public:
_crypto = std::move(crypto_in);
return *this;
}
+ TransportConfig &time_tools(fnet::TimeTools::SP time_tools_in) {
+ _time_tools = std::move(time_tools_in);
+ return *this;
+ }
+
const FNET_Config & config() const { return _config; }
uint32_t num_threads() const { return _num_threads; }
@@ -62,8 +92,10 @@ private:
FNET_Config _config;
vespalib::AsyncResolver::SP _resolver;
vespalib::CryptoEngine::SP _crypto;
+ fnet::TimeTools::SP _time_tools;
uint32_t _num_threads;
};
+
/**
* This class represents the transport layer and handles a collection
* of transport threads. Note: remember to shut down your transport
@@ -77,6 +109,7 @@ private:
vespalib::AsyncResolver::SP _async_resolver;
vespalib::CryptoEngine::SP _crypto_engine;
+ fnet::TimeTools::SP _time_tools;
std::unique_ptr<vespalib::SyncableThreadExecutor> _work_pool;
Threads _threads;
const FNET_Config _config;
@@ -91,7 +124,7 @@ public:
* the current thread become the transport thread. Main may only
* be called for single-threaded transports.
**/
- explicit FNET_Transport(TransportConfig config);
+ explicit FNET_Transport(const TransportConfig &config);
explicit FNET_Transport(uint32_t num_threads)
: FNET_Transport(TransportConfig(num_threads)) {}
@@ -100,6 +133,7 @@ public:
~FNET_Transport();
const FNET_Config & getConfig() const { return _config; }
+ const fnet::TimeTools &time_tools() const { return *_time_tools; }
/**
* Try to execute the given task on the internal work pool
@@ -284,6 +318,26 @@ public:
**/
bool Start(FastOS_ThreadPool *pool);
+ /**
+ * Capture transport threads. Used for testing purposes,
+ * preferably combined with a debug variant of TimeTools.
+ *
+ * After this function is called, the capture_hook will be called
+ * repeatedly as long as it returns true. The first time it
+ * returns false, appropriate cleanup will be performed and the
+ * capture_hook will never be called again; it detaches
+ * itself. All transport threads will be blocked while the
+ * capture_hook is called. Between calls to the capture_hook each
+ * transport thread will run its event loop exactly once, all
+ * pending work in the work pool will be performed and all pending
+ * dns lookups will be performed. Note that the capture_hook
+ * should detach itself by returning false before the transport
+ * itself is shut down.
+ *
+ * @param capture_hook called until it returns false
+ **/
+ void attach_capture_hook(std::function<bool()> capture_hook);
+
//-------------------------------------------------------------------------
// forward async IO Component operations to their owners
//-------------------------------------------------------------------------
diff --git a/fnet/src/vespa/fnet/transport_debugger.cpp b/fnet/src/vespa/fnet/transport_debugger.cpp
new file mode 100644
index 00000000000..1878b921171
--- /dev/null
+++ b/fnet/src/vespa/fnet/transport_debugger.cpp
@@ -0,0 +1,69 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "transport_debugger.h"
+#include <vespa/vespalib/util/require.h>
+#include <cassert>
+
+namespace fnet {
+
+void
+TransportDebugger::Meet::mingle()
+{
+ bool call_again = true;
+ for (size_t i = 0; i < size(); ++i) {
+ if (!in(i)) {
+ call_again = false;
+ }
+ }
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = call_again;
+ }
+}
+
+TransportDebugger::TransportDebugger()
+ : _time(),
+ _meet()
+{
+}
+
+TransportDebugger::~TransportDebugger()
+{
+ assert(!_meet && "error: still attached");
+}
+
+void
+TransportDebugger::attach(std::initializer_list<std::reference_wrapper<FNET_Transport> > list)
+{
+ size_t N = list.size() + 1;
+ REQUIRE(!_meet);
+ REQUIRE(N > 1);
+ _meet = std::make_shared<Meet>(N);
+ for (auto &item: list) {
+ item.get().attach_capture_hook([meet = _meet]()
+ {
+ REQUIRE(meet->rendezvous(true));
+ // capture point: between meetings
+ return meet->rendezvous(true);
+ });
+ }
+ REQUIRE(_meet->rendezvous(true)); // capture transport threads
+}
+
+void
+TransportDebugger::step()
+{
+ REQUIRE(_meet);
+ _time += 5ms; // pretend 5ms passes between each event loop iteration
+ REQUIRE(_meet->rendezvous(true)); // release transport threads
+ REQUIRE(_meet->rendezvous(true)); // capture transport threads
+}
+
+void
+TransportDebugger::detach()
+{
+ REQUIRE(_meet);
+ REQUIRE(!_meet->rendezvous(false)); // release transport threads (final time)
+ _meet.reset();
+}
+
+}
diff --git a/fnet/src/vespa/fnet/transport_debugger.h b/fnet/src/vespa/fnet/transport_debugger.h
new file mode 100644
index 00000000000..ed3738bb9fe
--- /dev/null
+++ b/fnet/src/vespa/fnet/transport_debugger.h
@@ -0,0 +1,60 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "transport.h"
+#include <vespa/vespalib/util/rendezvous.h>
+#include <vespa/vespalib/util/time.h>
+#include <memory>
+
+namespace fnet {
+
+/**
+ * This class is used to control transport threads during unit
+ * testing.
+ *
+ * The TimeTools created by this class should be used when setting up
+ * all transports used in the test. The supplied TimeTools will make
+ * sure no thread ever blocks waiting for io-events and also make sure
+ * all threads observe the same externally controlled current
+ * time. After the transport layers are started, the attach function
+ * is used to start controlling event loop execution. While attached,
+ * calling the step function will run each transport thread event loop
+ * exactly once (in parallel), wait for pending dns resolving, wait
+ * for pending tls handshake work and advance the current time with
+ * 5ms (making sure 'time passes' and 'stuff happens' at a reasonable
+ * relative rate). It is important to call detach to release the
+ * transports before trying to shut them down.
+ *
+ * Note that both server and client should be controlled by the same
+ * debugger when testing rpc. Using external services will result in
+ * (synthetic) time passing too fast compared to stuff actually
+ * happening, since you do not control the other end-point in the same
+ * way.
+ *
+ * Take a look at the unit test for this class for an example of how
+ * to use it.
+ **/
+class TransportDebugger
+{
+private:
+ struct Meet : vespalib::Rendezvous<bool,bool> {
+ Meet(size_t N) : vespalib::Rendezvous<bool,bool>(N) {}
+ void mingle() override;
+ };
+ vespalib::steady_time _time;
+ std::shared_ptr<Meet> _meet;
+
+public:
+ TransportDebugger();
+ ~TransportDebugger();
+ vespalib::steady_time time() const { return _time; }
+ TimeTools::SP time_tools() {
+ return TimeTools::make_debug(vespalib::duration::zero(), [this]() noexcept { return time(); });
+ }
+ void attach(std::initializer_list<std::reference_wrapper<FNET_Transport> > list);
+ void step();
+ void detach();
+};
+
+}
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 5e1a9759a60..760c951fedd 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -207,7 +207,7 @@ extern "C" {
FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
: _owner(owner_in),
- _now(steady_clock ::now()),
+ _now(owner_in.time_tools().current_time()),
_scheduler(&_now),
_componentsHead(nullptr),
_timeOutHead(nullptr),
@@ -246,6 +246,12 @@ FNET_TransportThread::getConfig() const {
return _owner.getConfig();
}
+const fnet::TimeTools &
+FNET_TransportThread::time_tools() const
+{
+ return _owner.time_tools();
+}
+
bool
FNET_TransportThread::tune(SocketHandle &handle) const
{
@@ -388,7 +394,7 @@ FNET_TransportThread::InitEventLoop()
LOG(error, "Transport: InitEventLoop: object already active!");
return false;
}
- _now = steady_clock::now();
+ _now = time_tools().current_time();
return true;
}
@@ -465,12 +471,12 @@ bool
FNET_TransportThread::EventLoopIteration() {
if (!IsShutDown()) {
- int msTimeout = FNET_Scheduler::tick_ms.count();
+ int msTimeout = vespalib::count_ms(time_tools().event_timeout());
// obtain I/O events
_selector.poll(msTimeout);
// sample current time (performed once per event loop iteration)
- _now = steady_clock::now();
+ _now = time_tools().current_time();
// handle io-events
auto dispatchResult = _selector.dispatch(*this);
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 371544f9c8a..3c00a3fe93b 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -14,6 +14,7 @@
#include <condition_variable>
#include <chrono>
+namespace fnet { struct TimeTools; }
class FNET_Transport;
class FNET_ControlPacket;
class FNET_IPacketStreamer;
@@ -138,7 +139,7 @@ private:
* @return config object.
**/
const FNET_Config & getConfig() const;
-
+ const fnet::TimeTools &time_tools() const;
void handle_add_cmd(FNET_IOComponent *ioc);
void handle_close_cmd(FNET_IOComponent *ioc);
diff --git a/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp b/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp
index 46f6837c4f1..8624e38c787 100644
--- a/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp
+++ b/slobrok/src/tests/local_rpc_monitor_map/local_rpc_monitor_map_test.cpp
@@ -104,7 +104,7 @@ struct LocalRpcMonitorMapTest : public ::testing::Test {
ServiceMapping mapping_conflict;
LocalRpcMonitorMapTest()
: time(duration::zero()),
- scheduler(&time, &time), monitor_log(), map_log(),
+ scheduler(&time), monitor_log(), map_log(),
map(&scheduler, [this](auto &owner)
{
EXPECT_EQ(&owner, &map);