summaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2018-09-21 10:26:43 +0000
committerHåvard Pettersen <havardpe@oath.com>2018-09-21 10:26:43 +0000
commit77d945c559b9bf0e560af796a5f0a9303c860e10 (patch)
treeffedbe6f181cf7223ed56fbab7aefee283a0f6d9 /fnet
parent59f42ff1e807e60361d13663e4039038cd239120 (diff)
improve rpc invocation test
Diffstat (limited to 'fnet')
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp907
1 files changed, 212 insertions, 695 deletions
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index e3bd662214f..dd08f365d58 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -1,100 +1,92 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/net/socket_spec.h>
+#include <vespa/vespalib/util/benchmark_timer.h>
#include <vespa/fnet/frt/frt.h>
#include <mutex>
#include <condition_variable>
-//-------------------------------------------------------------
+using vespalib::SocketSpec;
+using vespalib::BenchmarkTimer;
-#include "my_crypto_engine.hpp"
-vespalib::CryptoEngine::SP crypto;
+constexpr double timeout = 60.0;
+constexpr double short_timeout = 0.1;
//-------------------------------------------------------------
-std::mutex _delayedReturnCntLock;
-uint32_t _delayedReturnCnt = 0;
-
-uint32_t _phase_simple_cnt = 0;
-uint32_t _phase_void_cnt = 0;
-uint32_t _phase_speed_cnt = 0;
-uint32_t _phase_advanced_cnt = 0;
-uint32_t _phase_error_cnt = 0;
-uint32_t _phase_timeout_cnt = 0;
-uint32_t _phase_abort_cnt = 0;
-uint32_t _phase_echo_cnt = 0;
+#include "my_crypto_engine.hpp"
+vespalib::CryptoEngine::SP crypto;
//-------------------------------------------------------------
-struct LockedReqWait : public FRT_IRequestWait
-{
- std::mutex _condLock; // cond used to signal req done
- std::condition_variable _cond; // cond used to signal req done
- bool _done; // flag indicating req done
-
- std::mutex _lockLock; // lock protecting virtual lock
- bool _lock; // virtual lock
- bool _wasLocked; // was 'locked' when req done
-
- LockedReqWait() : _cond(), _done(false), _lockLock(), _lock(false), _wasLocked(false) {}
- ~LockedReqWait() {}
-
- void lock() {
- std::lock_guard<std::mutex> guard(_lockLock);
- _lock = true;
- }
-
- void unlock() {
- std::lock_guard<std::mutex> guard(_lockLock);
- _lock = false;
- }
-
- bool isLocked() {
- std::lock_guard<std::mutex> guard(_lockLock);
- return _lock;
+class RequestLatch : public FRT_IRequestWait {
+private:
+ FRT_RPCRequest *_req;
+ std::mutex _lock;
+ std::condition_variable _cond;
+public:
+ RequestLatch() : _req(nullptr), _lock(), _cond() {}
+ ~RequestLatch() { ASSERT_TRUE(_req == nullptr); }
+ bool has_req() {
+ std::lock_guard guard(_lock);
+ return (_req != nullptr);
}
-
- void RequestDone(FRT_RPCRequest *) override {
- _wasLocked = isLocked();
- std::lock_guard<std::mutex> guard(_condLock);
- _done = true;
- _cond.notify_one();
+ FRT_RPCRequest *read() {
+ std::unique_lock guard(_lock);
+ _cond.wait(guard, [&req = _req]{ return (req != nullptr); });
+ auto ret = _req;
+ _req = nullptr;
+ _cond.notify_all();
+ return ret;
}
-
- void waitReq() {
- std::unique_lock<std::mutex> guard(_condLock);
- while(!_done) {
- _cond.wait(guard);
- }
+ void write(FRT_RPCRequest *req) {
+ std::unique_lock guard(_lock);
+ _cond.wait(guard, [&req = _req]{ return (req == nullptr); });
+ _req = req;
+ _cond.notify_all();
}
+ void RequestDone(FRT_RPCRequest *req) override { write(req); }
};
//-------------------------------------------------------------
-class DelayedReturn : public FNET_Task
-{
+class MyReq {
private:
FRT_RPCRequest *_req;
-
- DelayedReturn(const DelayedReturn &);
- DelayedReturn &operator=(const DelayedReturn &);
-
public:
- DelayedReturn(FNET_Scheduler *sched, FRT_RPCRequest *req, double delay)
- : FNET_Task(sched),
- _req(req)
+ MyReq(FRT_RPCRequest *req) : _req(req) {}
+ MyReq(const char *method_name)
+ : _req(new FRT_RPCRequest())
{
- {
- std::lock_guard<std::mutex> guard(_delayedReturnCntLock);
- _delayedReturnCnt++;
- }
- Schedule(delay);
+ _req->SetMethodName(method_name);
}
-
- void PerformTask() override
+ MyReq(uint32_t value, bool async, uint32_t error, uint8_t extra)
+ : _req(new FRT_RPCRequest())
{
- _req->Return();
- std::lock_guard<std::mutex> guard(_delayedReturnCntLock);
- _delayedReturnCnt--;
+ _req->SetMethodName("test");
+ _req->GetParams()->AddInt32(value);
+ _req->GetParams()->AddInt32(error);
+ _req->GetParams()->AddInt8(extra);
+ _req->GetParams()->AddInt8((async) ? 1 : 0);
+ }
+ ~MyReq() {
+ if (_req != nullptr) {
+ _req->SubRef();
+ }
+ }
+ MyReq(const MyReq &rhs) = delete;
+ MyReq &operator=(const MyReq &rhs) = delete;
+ FRT_RPCRequest &get() { return *_req; }
+ FRT_RPCRequest *borrow() { return _req; }
+ FRT_RPCRequest *steal() {
+ auto ret = _req;
+ _req = nullptr;
+ return ret;
+ }
+ uint32_t get_int_ret() {
+ ASSERT_TRUE(_req != nullptr);
+ ASSERT_TRUE(_req->CheckReturnTypes("i"));
+ return _req->GetReturn()->GetValue(0)._intval32;
}
};
@@ -103,31 +95,22 @@ public:
class EchoTest : public FRT_Invokable
{
private:
- vespalib::Stash *_echo_stash;
- FRT_Values *_echo_args;
+ vespalib::Stash _echo_stash;
+ FRT_Values _echo_args;
EchoTest(const EchoTest &);
EchoTest &operator=(const EchoTest &);
public:
- EchoTest() : _echo_stash(nullptr), _echo_args(nullptr) {}
- ~EchoTest()
+ EchoTest(FRT_Supervisor *supervisor)
+ : _echo_stash(),
+ _echo_args(_echo_stash)
{
- delete _echo_args;
- delete _echo_stash;
- }
-
- void Init(FRT_Supervisor *supervisor)
- {
- _echo_stash = new vespalib::Stash();
- _echo_args = new FRT_Values(*_echo_stash);
- assert(_echo_stash != nullptr && _echo_args != nullptr);
-
FRT_ReflectionBuilder rb(supervisor);
rb.DefineMethod("echo", "*", "*",
FRT_METHOD(EchoTest::RPC_Echo), this);
- FRT_Values *args = _echo_args;
+ FRT_Values *args = &_echo_args;
args->EnsureFree(16);
args->AddInt8(8);
@@ -179,15 +162,14 @@ public:
args->SetData(&pt_data[2], "dat3", 4);
}
- bool PrepareEchoReq(FRT_RPCRequest *req)
+ bool prepare_params(FRT_RPCRequest &req)
{
FNET_DataBuffer buf;
- req->SetMethodName("echo");
- _echo_args->EncodeCopy(&buf);
- req->GetParams()->DecodeCopy(&buf, buf.GetDataLen());
- return (req->GetParams()->Equals(_echo_args) &&
- _echo_args->Equals(req->GetParams()));
+ _echo_args.EncodeCopy(&buf);
+ req.GetParams()->DecodeCopy(&buf, buf.GetDataLen());
+ return (req.GetParams()->Equals(&_echo_args) &&
+ _echo_args.Equals(req.GetParams()));
}
void RPC_Echo(FRT_RPCRequest *req)
@@ -196,7 +178,7 @@ public:
req->GetParams()->EncodeCopy(&buf);
req->GetReturn()->DecodeCopy(&buf, buf.GetDataLen());
- if (!req->GetReturn()->Equals(_echo_args) ||
+ if (!req->GetReturn()->Equals(&_echo_args) ||
!req->GetReturn()->Equals(req->GetParams()))
{
req->SetError(10000, "Streaming error");
@@ -209,19 +191,16 @@ public:
class TestRPC : public FRT_Invokable
{
private:
- FRT_Supervisor *_supervisor;
- FNET_Scheduler *_scheduler;
uint32_t _intValue;
+ RequestLatch _detached_req;
TestRPC(const TestRPC &);
TestRPC &operator=(const TestRPC &);
public:
- TestRPC(FRT_Supervisor *supervisor, // server supervisor
- FNET_Scheduler *scheduler) // client scheduler
- : _supervisor(supervisor),
- _scheduler(scheduler),
- _intValue(0)
+ TestRPC(FRT_Supervisor *supervisor)
+ : _intValue(0),
+ _detached_req()
{
FRT_ReflectionBuilder rb(supervisor);
@@ -233,7 +212,7 @@ public:
FRT_METHOD(TestRPC::RPC_IncValue), this);
rb.DefineMethod("getValue", "", "i",
FRT_METHOD(TestRPC::RPC_GetValue), this);
- rb.DefineMethod("testFast", "iiibb", "i",
+ rb.DefineMethod("test", "iibb", "i",
FRT_METHOD(TestRPC::RPC_Test), this);
}
@@ -241,10 +220,9 @@ public:
{
FRT_Values &param = *req->GetParams();
uint32_t value = param[0]._intval32;
- uint32_t delay = param[1]._intval32;
- uint32_t error = param[2]._intval32;
- uint8_t extra = param[3]._intval8;
- uint8_t async = param[4]._intval8;
+ uint32_t error = param[1]._intval32;
+ uint8_t extra = param[2]._intval8;
+ uint8_t async = param[3]._intval8;
req->GetReturn()->AddInt32(value);
if (extra != 0) {
@@ -254,49 +232,7 @@ public:
req->SetError(error);
}
if (async != 0) {
- req->Detach();
- if (delay == 0) {
- req->Return();
- } else {
- req->getStash().create<DelayedReturn>(_scheduler, req, ((double)delay) / 1000.0);
- }
- } else {
-
- if (delay > 0) {
-
- const char *suffix = "testFast";
- uint32_t suffix_len = strlen(suffix);
- uint32_t name_len = req->GetMethodNameLen();
- bool remote = req->GetContext()._value.VOIDP != nullptr;
- bool instant = name_len > suffix_len &&
- strcmp(req->GetMethodName() + name_len - suffix_len, suffix) == 0;
-
- if (remote && instant) {
-
- // block, but don't cripple server scheduler...
- // (NB: in 'real life', instant methods should never block)
-
- FastOS_TimeInterface *now = _supervisor->GetTransport()->GetTimeSampler();
- FNET_Scheduler *scheduler = _supervisor->GetScheduler();
- assert(scheduler->GetTimeSampler() == now);
-
- while (delay > 0) {
- if (delay > 20) {
- FastOS_Thread::Sleep(20);
- delay -= 20;
- } else {
- FastOS_Thread::Sleep(delay);
- delay = 0;
- }
- now->SetNow();
- scheduler->CheckTasks();
- }
-
- } else {
-
- FastOS_Thread::Sleep(delay);
- }
- }
+ _detached_req.write(req->Detach());
}
}
@@ -320,607 +256,188 @@ public:
{
req->GetReturn()->AddInt32(_intValue);
}
-};
-//-------------------------------------------------------------
-
-enum {
- OK_RET = 0,
- BOGUS_RET = 1
-};
-
-enum {
- PHASE_NULL = 0,
- PHASE_SETUP,
- PHASE_SIMPLE,
- PHASE_VOID,
- PHASE_SPEED,
- PHASE_ADVANCED,
- PHASE_ERROR,
- PHASE_TIMEOUT,
- PHASE_ABORT,
- PHASE_ECHO,
- PHASE_SHUTDOWN,
- PHASE_ZZZ
-};
-
-const char phase_names[PHASE_ZZZ][32] =
-{
- "nullptr",
- "SETUP",
- "SIMPLE",
- "VOID",
- "SPEED",
- "ADVANCED",
- "ERROR",
- "TIMEOUT",
- "ABORT",
- "ECHO",
- "SHUTDOWN"
-};
-
-enum {
- TIMING_NULL = 0,
- TIMING_INSTANT,
- TIMING_ZZZ
-};
-
-const char timing_names[TIMING_ZZZ][32] =
-{
- "nullptr",
- "INSTANT",
-};
-
-enum {
- HANDLING_NULL = 0,
- HANDLING_SYNC,
- HANDLING_ASYNC,
- HANDLING_ZZZ
-};
-
-const char handling_names[HANDLING_ZZZ][32] =
-{
- "nullptr",
- "SYNC",
- "ASYNC"
+ RequestLatch &detached_req() { return _detached_req; }
};
//-------------------------------------------------------------
-struct State {
+class Fixture
+{
+private:
FRT_Supervisor _client;
FRT_Supervisor _server;
- TestRPC _rpc;
- EchoTest _echo;
- std::string _peerSpec;
- uint32_t _testPhase;
- uint32_t _timing;
- uint32_t _handling;
- double _timeout;
+ vespalib::string _peerSpec;
FRT_Target *_target;
- FRT_RPCRequest *_req;
+ TestRPC _testRPC;
+ EchoTest _echoTest;
+
+public:
+ FRT_Target &target() { return *_target; }
+ FRT_Target *make_bad_target() { return _client.GetTarget("bogus address"); }
+ RequestLatch &detached_req() { return _testRPC.detached_req(); }
+ EchoTest &echo() { return _echoTest; }
- State()
+ Fixture()
: _client(crypto),
_server(crypto),
- _rpc(&_server, _client.GetScheduler()),
- _echo(),
_peerSpec(),
- _testPhase(PHASE_NULL),
- _timing(TIMING_NULL),
- _handling(HANDLING_NULL),
- _timeout(5.0),
_target(nullptr),
- _req(nullptr)
+ _testRPC(&_server),
+ _echoTest(&_server)
{
_client.GetTransport()->SetTCPNoDelay(true);
_server.GetTransport()->SetTCPNoDelay(true);
- _echo.Init(&_server);
+ ASSERT_TRUE(_server.Listen("tcp/0"));
+ ASSERT_TRUE(_server.Start());
+ ASSERT_TRUE(_client.Start());
+ _peerSpec = SocketSpec::from_host_port("localhost", _server.GetListenPort()).spec();
+ _target = _client.GetTarget(_peerSpec.c_str());
+ //---------------------------------------------------------------------
+ MyReq req("frt.rpc.ping");
+ target().InvokeSync(req.borrow(), timeout);
+ ASSERT_TRUE(!req.get().IsError());
}
- void SetTimeout(double timeout)
- {
- _timeout = timeout;
+ ~Fixture() {
+ _client.ShutDown(true);
+ _server.ShutDown(true);
+ _target->SubRef();
}
+};
- void NewReq()
- {
- if (_req != nullptr) {
- _req->SubRef();
- }
- _req = new FRT_RPCRequest();
- }
+//-------------------------------------------------------------
- void FreeReq()
+TEST_F("require that simple invocation works", Fixture()) {
+ MyReq req("inc");
+ req.get().GetParams()->AddInt32(502);
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_EQUAL(req.get_int_ret(), 503u);
+}
+
+TEST_F("require that void invocation works", Fixture()) {
{
- if (_req != nullptr) {
- _req->SubRef();
- }
- _req = nullptr;
+ MyReq req("setValue");
+ req.get().GetParams()->AddInt32(40);
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_TRUE(req.get().CheckReturnTypes(""));
}
-
- void LostReq()
{
- _req = nullptr;
+ MyReq req("incValue");
+ f1.target().InvokeVoid(req.steal());
}
-
- void PrepareTestMethod()
{
- NewReq();
- if (_timing != TIMING_INSTANT) {
- ASSERT_TRUE(false); // consult your dealer...
- }
- _req->SetMethodName("testFast");
+ MyReq req("incValue");
+ f1.target().InvokeVoid(req.steal());
}
-
- void SetTestParams(uint32_t value, uint32_t delay,
- uint32_t error = FRTE_NO_ERROR,
- uint8_t extra = 0)
{
- _req->GetParams()->AddInt32(value);
- _req->GetParams()->AddInt32(delay);
- _req->GetParams()->AddInt32(error);
- _req->GetParams()->AddInt8(extra);
- bool async = (_handling == HANDLING_ASYNC);
- if (_handling != HANDLING_SYNC &&
- _handling != HANDLING_ASYNC)
- {
- ASSERT_TRUE(false); // consult your dealer...
- }
- _req->GetParams()->AddInt8((async) ? 1 : 0);
+ MyReq req("getValue");
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_EQUAL(req.get_int_ret(), 42u);
}
-
- void InvokeSync();
- void InvokeVoid();
- void InvokeAsync(FRT_IRequestWait *w);
- void InvokeTest(uint32_t value,
- uint32_t delay = 0,
- uint32_t error = FRTE_NO_ERROR,
- uint8_t extra = 0);
- void InvokeTestAndAbort(uint32_t value,
- uint32_t delay = 0,
- uint32_t error = FRTE_NO_ERROR,
- uint8_t extra = 0);
- bool WaitForDelayedReturnCount(uint32_t wantedCount, double timeout);
-
-private:
- State(const State &);
- State &operator=(const State &);
-};
-
-
-void
-State::InvokeSync()
-{
- _target->InvokeSync(_req, _timeout);
-}
-
-
-void
-State::InvokeVoid()
-{
- _target->InvokeVoid(_req);
-}
-
-
-void
-State::InvokeAsync(FRT_IRequestWait *w)
-{
- _target->InvokeAsync(_req, _timeout, w);
-}
-
-
-void
-State::InvokeTest(uint32_t value, uint32_t delay,
- uint32_t error, uint8_t extra)
-{
- PrepareTestMethod();
- SetTestParams(value, delay, error, extra);
- InvokeSync();
-}
-
-
-void
-State::InvokeTestAndAbort(uint32_t value, uint32_t delay,
- uint32_t error, uint8_t extra)
-{
- PrepareTestMethod();
- SetTestParams(value, delay, error, extra);
- FRT_SingleReqWait w;
- InvokeAsync(&w);
- _req->Abort();
- w.WaitReq();
}
-bool
-State::WaitForDelayedReturnCount(uint32_t wantedCount, double timeout)
-{
- FastOS_Time timer;
- timer.SetNow();
- for (;;) {
- uint32_t delayedReturnCnt;
+TEST_F("measure minimal invocation latency", Fixture()) {
+ size_t cnt = 0;
+ uint32_t val = 0;
+ BenchmarkTimer timer(1.0);
+ while (timer.has_budget()) {
+ timer.before();
{
- std::lock_guard<std::mutex> guard(_delayedReturnCntLock);
- delayedReturnCnt = _delayedReturnCnt;
- }
- if (delayedReturnCnt == wantedCount) {
- return true;
- }
- if ((timer.MilliSecsToNow() / 1000.0) > timeout) {
- return false;
+ MyReq req("inc");
+ req.get().GetParams()->AddInt32(val);
+ f1.target().InvokeSync(req.borrow(), timeout);
+ ASSERT_TRUE(!req.get().IsError());
+ val = req.get_int_ret();
+ ++cnt;
}
- FastOS_Thread::Sleep(10);
+ timer.after();
}
+ EXPECT_EQUAL(cnt, val);
+ double t = timer.min_time();
+ fprintf(stderr, "latency of invocation: %1.3f ms\n", t * 1000.0);
}
-//-------------------------------------------------------------
-
-bool CheckTypes(FRT_RPCRequest *req, const char *spec) {
- return FRT_Values::CheckTypes(spec, req->GetReturnSpec());
-}
-
-FRT_Value &Get(FRT_RPCRequest *req, uint32_t idx) {
- return req->GetReturn()->GetValue(idx);
-}
-
-//-------------------------------------------------------------
-
-void TestSetup(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_SETUP);
-
- bool listenOK = _state->_server.Listen("tcp/0");
-
- char spec[64];
- sprintf(spec, "tcp/localhost:%d", _state->_server.GetListenPort());
- _state->_peerSpec = spec;
-
- bool serverStartOK = _state->_server.Start();
- bool clientStartOK = _state->_client.Start();
-
- ASSERT_TRUE(listenOK);
- ASSERT_TRUE(serverStartOK);
- ASSERT_TRUE(clientStartOK);
-
- _state->_target = _state->_client.GetTarget(_state->_peerSpec.c_str());
- _state->NewReq();
- _state->_req->SetMethodName("frt.rpc.ping");
- _state->_target->InvokeSync(_state->_req, 5.0);
- ASSERT_TRUE(!_state->_req->IsError());
-}
-
-
-void TestSimple(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_SIMPLE);
- _phase_simple_cnt++;
- _state->NewReq();
- _state->_req->SetMethodName("inc");
- _state->_req->GetParams()->AddInt32(502);
- _state->InvokeSync();
- EXPECT_TRUE(!_state->_req->IsError() &&
- CheckTypes(_state->_req, "i") &&
- Get(_state->_req, 0)._intval32 == 503);
+TEST_F("require that abort has no effect on a completed request", Fixture()) {
+ MyReq req(42, false, FRTE_NO_ERROR, 0);
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_EQUAL(req.get_int_ret(), 42u);
+ req.get().Abort();
+ EXPECT_EQUAL(req.get_int_ret(), 42u);
}
-
-void TestVoid(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_VOID);
- _phase_void_cnt++;
-
- _state->NewReq();
- _state->_req->SetMethodName("setValue");
- _state->_req->GetParams()->AddInt32(40);
- _state->InvokeSync();
- EXPECT_TRUE(!_state->_req->IsError() &&
- CheckTypes(_state->_req, ""));
-
- _state->NewReq();
- _state->_req->SetMethodName("incValue");
- _state->InvokeVoid();
- _state->LostReq();
-
- _state->NewReq();
- _state->_req->SetMethodName("incValue");
- _state->InvokeVoid();
- _state->LostReq();
-
- _state->NewReq();
- _state->_req->SetMethodName("getValue");
- _state->InvokeSync();
- EXPECT_TRUE(!_state->_req->IsError() &&
- CheckTypes(_state->_req, "i") &&
- Get(_state->_req, 0)._intval32 == 42);
+TEST_F("require that a request can be responded to at a later time", Fixture()) {
+ RequestLatch result;
+ MyReq req(42, true, FRTE_NO_ERROR, 0);
+ f1.target().InvokeAsync(req.steal(), timeout, &result);
+ EXPECT_TRUE(!result.has_req());
+ f1.detached_req().read()->Return();
+ MyReq ret(result.read());
+ EXPECT_EQUAL(ret.get_int_ret(), 42u);
}
-
-void TestSpeed(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_SPEED);
- _phase_speed_cnt++;
-
- FastOS_Time start;
- FastOS_Time stop;
- uint32_t val = 0;
- uint32_t cnt = 0;
-
- _state->NewReq();
- FRT_RPCRequest *req = _state->_req;
- FRT_Target *target = _state->_target;
-
- // calibrate cnt to be used
- start.SetNow();
- for (cnt = 0; cnt < 1000000; cnt++) {
- req->SetMethodName("inc");
- req->GetParams()->AddInt32(0);
- target->InvokeSync(req, 5.0);
- if (req->IsError()) {
- break;
- }
- req->Reset(); // ok if no error
- if (start.MilliSecsToNow() > 20.0) {
- break;
- }
- }
- cnt = (cnt == 0) ? 1 : cnt * 10;
-
- fprintf(stderr, "checking invocation latency... (cnt = %d)\n", cnt);
-
- _state->NewReq();
- req = _state->_req;
-
- // actual benchmark
- start.SetNow();
- for (uint32_t i = 0; i < cnt; i++) {
- req->SetMethodName("inc");
- req->GetParams()->AddInt32(val);
- target->InvokeSync(req, 60.0);
- if (req->IsError()) {
- fprintf(stderr, "... rpc error(%d): %s\n",
- req->GetErrorCode(),
- req->GetErrorMessage());
- break;
- }
- val = req->GetReturn()->GetValue(0)._intval32;
- req->Reset(); // ok if no error
+TEST_F("require that a bad target gives connection error", Fixture()) {
+ MyReq req("frt.rpc.ping");
+ {
+ FRT_Target *bad_target = f1.make_bad_target();
+ bad_target->InvokeSync(req.borrow(), timeout);
+ bad_target->SubRef();
}
- stop.SetNow();
- stop -= start;
- double latency = stop.MilliSecs() / (double) cnt;
-
- EXPECT_EQUAL(val, cnt);
- fprintf(stderr, "latency of invocation: %1.3f ms\n", latency);
+ EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_CONNECTION);
}
-
-void TestAdvanced(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_ADVANCED);
- _phase_advanced_cnt++;
-
- // Test invocation
- //----------------
- _state->InvokeTest(42);
- EXPECT_TRUE(!_state->_req->IsError() &&
- CheckTypes(_state->_req, "i") &&
- Get(_state->_req, 0)._intval32 == 42);
-
- // Abort has no effect after request is done
- //------------------------------------------
- _state->_req->Abort();
- EXPECT_TRUE(!_state->_req->IsError() &&
- CheckTypes(_state->_req, "i") &&
- Get(_state->_req, 0)._intval32 == 42);
-
- // Test invocation with delay
- //---------------------------
- _state->InvokeTest(58, 100);
- EXPECT_TRUE(!_state->_req->IsError() &&
- CheckTypes(_state->_req, "i") &&
- Get(_state->_req, 0)._intval32 == 58);
+TEST_F("require that non-existing method gives appropriate error", Fixture()) {
+ MyReq req("bogus");
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_NO_SUCH_METHOD);
}
-
-void TestError(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_ERROR);
- _phase_error_cnt++;
-
- // bad target -> sync error -> avoid deadlock
- //-------------------------------------------
- if (_state->_handling == HANDLING_ASYNC)
- {
- // stash away valid target
- FRT_Target *stateTarget = _state->_target; // backup of valid target
-
- _state->_target = _state->_client.GetTarget("bogus address");
- _state->NewReq();
- _state->_req->SetMethodName("frt.rpc.ping");
- LockedReqWait lw;
- lw.lock();
- _state->InvokeAsync(&lw);
- lw.unlock();
- lw.waitReq();
- EXPECT_TRUE(!lw._wasLocked);
- EXPECT_TRUE(_state->_req->GetErrorCode() == FRTE_RPC_CONNECTION);
-
- // restore valid target
- _state->_target->SubRef();
- _state->_target = stateTarget;
- }
-
- // no such method
- //---------------
- if (_state->_timing == TIMING_INSTANT &&
- _state->_handling == HANDLING_SYNC)
- {
- _state->NewReq();
- _state->_req->SetMethodName("bogus");
- _state->InvokeSync();
- EXPECT_TRUE(_state->_req->GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD);
- }
-
- // wrong params
- //-------------
- if (_state->_handling == HANDLING_SYNC) {
-
- _state->PrepareTestMethod();
- _state->InvokeSync();
- EXPECT_TRUE(_state->_req->GetErrorCode() == FRTE_RPC_WRONG_PARAMS);
-
- _state->PrepareTestMethod();
- _state->_req->GetParams()->AddInt32(42);
- _state->_req->GetParams()->AddInt32(0);
- _state->_req->GetParams()->AddInt8(0);
- _state->_req->GetParams()->AddInt8(0);
- _state->_req->GetParams()->AddInt8(0);
- _state->InvokeSync();
- EXPECT_TRUE(_state->_req->GetErrorCode() == FRTE_RPC_WRONG_PARAMS);
-
- _state->PrepareTestMethod();
- _state->_req->GetParams()->AddInt32(42);
- _state->_req->GetParams()->AddInt32(0);
- _state->_req->GetParams()->AddInt32(0);
- _state->_req->GetParams()->AddInt8(0);
- _state->_req->GetParams()->AddInt8(0);
- _state->_req->GetParams()->AddInt8(0);
- _state->InvokeSync();
- EXPECT_TRUE(_state->_req->GetErrorCode() == FRTE_RPC_WRONG_PARAMS);
- }
-
- // wrong return
- //-------------
- _state->InvokeTest(42, 0, 0, BOGUS_RET);
- EXPECT_TRUE(_state->_req->GetErrorCode() == FRTE_RPC_WRONG_RETURN);
-
- // method failed
- //--------------
- _state->InvokeTest(42, 0, 5000, BOGUS_RET);
- EXPECT_TRUE(_state->_req->GetErrorCode() == 5000);
+TEST_F("require that wrong parameter types give appropriate error", Fixture()) {
+ MyReq req("setValue");
+ req.get().GetParams()->AddString("40");
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_WRONG_PARAMS);
}
-
-void TestTimeout(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_TIMEOUT);
- _phase_timeout_cnt++;
-
- _state->SetTimeout(0.1);
-
- // Test timeout
- //-------------
- _state->InvokeTest(123, 5000);
- EXPECT_TRUE(_state->_req->GetErrorCode() == FRTE_RPC_TIMEOUT);
- FastOS_Thread::Sleep(5500); // settle
-
- _state->SetTimeout(5.0);
+TEST_F("require that wrong return value types give appropriate error", Fixture()) {
+ MyReq req(42, false, FRTE_NO_ERROR, 1);
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_EQUAL(req.get().GetErrorCode(), FRTE_RPC_WRONG_RETURN);
}
-
-void TestAbort(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_ABORT);
- _phase_abort_cnt++;
-
- // Test abort
- //-----------
- _state->InvokeTestAndAbort(456, 1000);
- EXPECT_TRUE(_state->_req->GetErrorCode() == FRTE_RPC_ABORT);
- FastOS_Thread::Sleep(1500); // settle
+TEST_F("require that the method itself can signal failure", Fixture()) {
+ MyReq req(42, false, 5000, 1);
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_EQUAL(req.get().GetErrorCode(), 5000u);
}
-
-void TestEcho(State *_state) {
- ASSERT_TRUE(_state->_testPhase == PHASE_ECHO);
- _phase_echo_cnt++;
-
- // Test echo
- //----------
- _state->NewReq();
- EXPECT_TRUE(_state->_echo.PrepareEchoReq(_state->_req));
- _state->InvokeSync();
- EXPECT_TRUE(!_state->_req->IsError());
- EXPECT_TRUE(_state->_req->GetReturn()->Equals(_state->_req->GetParams()));
+TEST_F("require that invocation can time out", Fixture()) {
+ RequestLatch result;
+ MyReq req(42, true, FRTE_NO_ERROR, 0);
+ f1.target().InvokeAsync(req.steal(), short_timeout, &result);
+ MyReq ret(result.read());
+ f1.detached_req().read()->Return();
+ EXPECT_EQUAL(ret.get().GetErrorCode(), FRTE_RPC_TIMEOUT);
}
+TEST_F("require that invocation can be aborted", Fixture()) {
+ RequestLatch result;
+ MyReq req(42, true, FRTE_NO_ERROR, 0);
+ FRT_RPCRequest *will_be_mine_again_soon = req.steal();
+ f1.target().InvokeAsync(will_be_mine_again_soon, timeout, &result);
+ will_be_mine_again_soon->Abort();
+ MyReq ret(result.read());
+ f1.detached_req().read()->Return();
+ EXPECT_EQUAL(ret.get().GetErrorCode(), FRTE_RPC_ABORT);
+}
-TEST_F("invoke test", State()) {
- State *_state = &f1;
-
- _state->_testPhase = PHASE_SETUP;
- TestSetup(_state);
-
- for (_state->_testPhase = PHASE_SIMPLE;
- _state->_testPhase < PHASE_SHUTDOWN;
- _state->_testPhase++) {
-
- {
- for (_state->_timing = TIMING_INSTANT;
- _state->_timing < TIMING_ZZZ;
- _state->_timing++) {
-
- for (_state->_handling = HANDLING_SYNC;
- _state->_handling < HANDLING_ZZZ;
- _state->_handling++) {
-
- switch (_state->_testPhase) {
- case PHASE_SIMPLE:
- if (_state->_timing == TIMING_INSTANT &&
- _state->_handling == HANDLING_SYNC)
- {
- TestSimple(_state);
- }
- break;
- case PHASE_VOID:
- if (_state->_timing == TIMING_INSTANT &&
- _state->_handling == HANDLING_SYNC)
- {
- TestVoid(_state);
- }
- break;
- case PHASE_SPEED:
- if (_state->_timing == TIMING_INSTANT &&
- _state->_handling == HANDLING_SYNC)
- {
- TestSpeed(_state);
- }
- break;
- case PHASE_ADVANCED:
- TestAdvanced(_state);
- break;
- case PHASE_ERROR:
- TestError(_state);
- break;
- case PHASE_TIMEOUT:
- TestTimeout(_state);
- break;
- case PHASE_ABORT:
- TestAbort(_state);
- break;
- case PHASE_ECHO:
- if (_state->_timing == TIMING_INSTANT &&
- _state->_handling == HANDLING_SYNC)
- {
- TestEcho(_state);
- }
- break;
- default:
- ASSERT_TRUE(false); // consult your dealer...
- }
- }
- }
- }
- }
- _state->_testPhase = PHASE_SHUTDOWN;
- _state->_timing = TIMING_NULL;
- _state->_handling = HANDLING_NULL;
- EXPECT_TRUE(_state->WaitForDelayedReturnCount(0, 120.0));
- _state->FreeReq();
- _state->_client.ShutDown(true);
- _state->_server.ShutDown(true);
- _state->_target->SubRef();
- _state->_target = nullptr;
- EXPECT_TRUE(_delayedReturnCnt == 0);
- EXPECT_TRUE(_phase_simple_cnt == 1);
- EXPECT_TRUE(_phase_void_cnt == 1);
- EXPECT_TRUE(_phase_speed_cnt == 1);
- EXPECT_TRUE(_phase_advanced_cnt == 2);
- EXPECT_TRUE(_phase_error_cnt == 2);
- EXPECT_TRUE(_phase_abort_cnt == 2);
- EXPECT_TRUE(_phase_echo_cnt == 1);
+TEST_F("require that parameters can be echoed as return values", Fixture()) {
+ MyReq req("echo");
+ ASSERT_TRUE(f1.echo().prepare_params(req.get()));
+ f1.target().InvokeSync(req.borrow(), timeout);
+ EXPECT_TRUE(!req.get().IsError());
+ EXPECT_TRUE(req.get().GetReturn()->Equals(req.get().GetParams()));
+ EXPECT_TRUE(req.get().GetParams()->Equals(req.get().GetReturn()));
}
TEST_MAIN() {