diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /fnet/src/examples |
Publish
Diffstat (limited to 'fnet/src/examples')
23 files changed, 1710 insertions, 0 deletions
diff --git a/fnet/src/examples/frt/rpc/.gitignore b/fnet/src/examples/frt/rpc/.gitignore new file mode 100644 index 00000000000..91218a0868b --- /dev/null +++ b/fnet/src/examples/frt/rpc/.gitignore @@ -0,0 +1,16 @@ +.depend +Makefile +echo_client +echo_server +rpc_callback_client +rpc_callback_server +rpc_client +rpc_info +rpc_invoke +rpc_proxy +rpc_server +fnet_echo_client_app +fnet_rpc_callback_client_app +fnet_rpc_callback_server_app +fnet_rpc_client_app +fnet_rpc_server_app diff --git a/fnet/src/examples/frt/rpc/CMakeLists.txt b/fnet/src/examples/frt/rpc/CMakeLists.txt new file mode 100644 index 00000000000..aae76bdcd4f --- /dev/null +++ b/fnet/src/examples/frt/rpc/CMakeLists.txt @@ -0,0 +1,60 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(fnet_rpc_server_app + SOURCES + rpc_server.cpp + INSTALL bin + DEPENDS + fnet +) +vespa_add_executable(fnet_rpc_client_app + SOURCES + rpc_client.cpp + INSTALL bin + DEPENDS + fnet +) +vespa_add_executable(fnet_echo_client_app + SOURCES + echo_client.cpp + INSTALL bin + DEPENDS + fnet +) +vespa_add_executable(fnet_rpc_info_app + SOURCES + rpc_info.cpp + OUTPUT_NAME rpc_info + INSTALL bin + DEPENDS + fnet +) +vespa_add_executable(fnet_rpc_proxy_app + SOURCES + rpc_proxy.cpp + OUTPUT_NAME rpc_proxy + INSTALL bin + DEPENDS + fnet +) +vespa_add_executable(fnet_rpc_callback_server_app + SOURCES + rpc_callback_server.cpp + INSTALL bin + DEPENDS + fnet +) +vespa_add_executable(fnet_rpc_callback_client_app + SOURCES + rpc_callback_client.cpp + INSTALL bin + DEPENDS + fnet +) +vespa_add_executable(fnet_rpc_invoke_app + SOURCES + rpc_invoke.cpp + OUTPUT_NAME rpc_invoke + INSTALL bin + DEPENDS + fnet +) diff --git a/fnet/src/examples/frt/rpc/echo_client.cpp b/fnet/src/examples/frt/rpc/echo_client.cpp new file mode 100644 index 00000000000..f34f4c65111 --- /dev/null +++ b/fnet/src/examples/frt/rpc/echo_client.cpp @@ -0,0 +1,94 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("echo_client"); +#include <vespa/fnet/frt/frt.h> + +class EchoClient : public FastOS_Application +{ +public: + virtual int Main(); +}; + +int +EchoClient::Main() +{ + if (_argc < 2) { + printf("usage : echo_client <connectspec>\n"); + return 1; + } + FRT_Supervisor supervisor; + + supervisor.Start(); + FRT_Target *target = supervisor.GetTarget(_argv[1]); + FRT_RPCRequest *req = supervisor.AllocRPCRequest(); + FRT_Values *args = req->GetParams(); + req->SetMethodName("frt.rpc.echo"); + args->EnsureFree(16); + + args->AddInt8(8); + uint8_t *pt_int8 = args->AddInt8Array(3); + pt_int8[0] = 1; + pt_int8[1] = 2; + pt_int8[2] = 3; + + args->AddInt16(16); + uint16_t *pt_int16 = args->AddInt16Array(3); + pt_int16[0] = 2; + pt_int16[1] = 4; + pt_int16[2] = 6; + + args->AddInt32(32); + uint32_t *pt_int32 = args->AddInt32Array(3); + pt_int32[0] = 4; + pt_int32[1] = 8; + pt_int32[2] = 12; + + args->AddInt64(64); + uint64_t *pt_int64 = args->AddInt64Array(3); + pt_int64[0] = 8; + pt_int64[1] = 16; + pt_int64[2] = 24; + + args->AddFloat(32.5); + float *pt_float = args->AddFloatArray(3); + pt_float[0] = 0.25; + pt_float[1] = 0.5; + pt_float[2] = 0.75; + + args->AddDouble(64.5); + double *pt_double = args->AddDoubleArray(3); + pt_double[0] = 0.1; + pt_double[1] = 0.2; + pt_double[2] = 0.3; + + args->AddString("string"); + FRT_StringValue *pt_string = args->AddStringArray(3); + args->SetString(&pt_string[0], "str1"); + args->SetString(&pt_string[1], "str2"); + args->SetString(&pt_string[2], "str3"); + + args->AddData("data", 4); + FRT_DataValue *pt_data = args->AddDataArray(3); + args->SetData(&pt_data[0], "dat1", 4); + args->SetData(&pt_data[1], "dat2", 4); + args->SetData(&pt_data[2], "dat3", 4); + + target->InvokeSync(req, 5.0); // Invoke + req->Print(); // Dump request data + if (req->GetReturn()->Equals(req->GetParams())) { + printf("Return values == parameters.\n"); + } else { + printf("Return values != parameters.\n"); + } + req->SubRef(); + supervisor.ShutDown(true); + return 0; +} + +int +main(int argc, char **argv) +{ + EchoClient myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/frt/rpc/rpc_callback_client.cpp b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp new file mode 100644 index 00000000000..479eb13fd43 --- /dev/null +++ b/fnet/src/examples/frt/rpc/rpc_callback_client.cpp @@ -0,0 +1,107 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("rpc_callback_client"); +#include <vespa/fnet/frt/frt.h> + + +struct RPC : public FRT_Invokable +{ + uint32_t invokeCnt; + RPC() : invokeCnt(0) {} + void Prod(FRT_RPCRequest *req); + void Init(FRT_Supervisor *s); +}; + +void +RPC::Prod(FRT_RPCRequest *req) +{ + (void) req; + ++invokeCnt; +} + +void +RPC::Init(FRT_Supervisor *s) +{ + FRT_ReflectionBuilder rb(s); + //------------------------------------------------------------------- + rb.DefineMethod("prod", "", "", true, + FRT_METHOD(RPC::Prod), this); + //------------------------------------------------------------------- +} + + +class MyApp : public FastOS_Application +{ +public: + virtual int Main(); +}; + +int +MyApp::Main() +{ + if (_argc < 2) { + printf("usage : rpc_server <connectspec>\n"); + return 1; + } + RPC rpc; + FRT_Supervisor orb; + rpc.Init(&orb); + orb.Start(); + + FRT_Target *target = orb.Get2WayTarget(_argv[1]); + FRT_RPCRequest *req = orb.AllocRPCRequest(); + + printf("invokeCnt: %d\n", rpc.invokeCnt); + + req->SetMethodName("callBack"); + req->GetParams()->AddString("prod"); + target->InvokeSync(req, 10.0); + + if(req->IsError()) { + printf("[error(%d): %s]\n", + req->GetErrorCode(), + req->GetErrorMessage()); + } + + printf("invokeCnt: %d\n", rpc.invokeCnt); + + req = orb.AllocRPCRequest(req); + req->SetMethodName("callBack"); + req->GetParams()->AddString("prod"); + target->InvokeSync(req, 10.0); + + if(req->IsError()) { + printf("[error(%d): %s]\n", + req->GetErrorCode(), + req->GetErrorMessage()); + } + + printf("invokeCnt: %d\n", rpc.invokeCnt); + + req = orb.AllocRPCRequest(req); + req->SetMethodName("callBack"); + req->GetParams()->AddString("prod"); + target->InvokeSync(req, 10.0); + + if(req->IsError()) { + printf("[error(%d): %s]\n", + req->GetErrorCode(), + req->GetErrorMessage()); + } + + printf("invokeCnt: %d\n", rpc.invokeCnt); + + req->SubRef(); + target->SubRef(); + orb.ShutDown(true); + return 0; +} + + +int +main(int argc, char **argv) +{ + MyApp myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/frt/rpc/rpc_callback_server.cpp b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp new file mode 100644 index 00000000000..05d3a205a29 --- /dev/null +++ b/fnet/src/examples/frt/rpc/rpc_callback_server.cpp @@ -0,0 +1,69 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("rpc_callback_server"); +#include <vespa/fnet/frt/frt.h> + + +struct RPC : public FRT_Invokable +{ + void CallBack(FRT_RPCRequest *req); + void Init(FRT_Supervisor *s); +}; + +void +RPC::CallBack(FRT_RPCRequest *req) +{ + FNET_Connection *conn = req->GetConnection(); + FRT_RPCRequest *cb = new FRT_RPCRequest(); + cb->SetMethodName(req->GetParams()->GetValue(0)._string._str); + FRT_Supervisor::InvokeSync(conn->Owner(), conn, cb, 5.0); + if(cb->IsError()) { + printf("[error(%d): %s]\n", + cb->GetErrorCode(), + cb->GetErrorMessage()); + } + cb->SubRef(); +} + +void +RPC::Init(FRT_Supervisor *s) +{ + FRT_ReflectionBuilder rb(s); + //------------------------------------------------------------------- + rb.DefineMethod("callBack", "s", "", false, + FRT_METHOD(RPC::CallBack), this); + //------------------------------------------------------------------- +} + + +class MyApp : public FastOS_Application +{ +public: + virtual int Main(); +}; + +int +MyApp::Main() +{ + FNET_SignalShutDown::hookSignals(); + if (_argc < 2) { + printf("usage : rpc_server <listenspec>\n"); + return 1; + } + RPC rpc; + FRT_Supervisor orb; + rpc.Init(&orb); + orb.Listen(_argv[1]); + FNET_SignalShutDown ssd(*orb.GetTransport()); + orb.Main(); + return 0; +} + + +int +main(int argc, char **argv) +{ + MyApp myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/frt/rpc/rpc_client.cpp b/fnet/src/examples/frt/rpc/rpc_client.cpp new file mode 100644 index 00000000000..cde39270698 --- /dev/null +++ b/fnet/src/examples/frt/rpc/rpc_client.cpp @@ -0,0 +1,92 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("rpc_client"); +#include <vespa/fnet/frt/frt.h> + + +class RPCClient : public FastOS_Application +{ +public: + virtual int Main(); +}; + +int +RPCClient::Main() +{ + if (_argc < 2) { + printf("usage : rpc_client <connectspec>\n"); + return 1; + } + FRT_Supervisor supervisor; + + supervisor.Start(); + FRT_Target *target = supervisor.GetTarget(_argv[1]); + + const char *str1 = "abc"; + const char *str2 = "def"; + float float1 = 20.5; + float float2 = 60.5; + double double1 = 25.5; + double double2 = 5.5; + + fprintf(stdout, "\nTesting concat method\n"); + FRT_RPCRequest *req = supervisor.AllocRPCRequest(); + req->SetMethodName("concat"); + req->GetParams()->AddString(str1); + req->GetParams()->AddString(str2); + target->InvokeSync(req, 5.0); + if (req->GetErrorCode() == FRTE_NO_ERROR) { + fprintf(stdout, "%s + %s = %s\n", str1, str2, + req->GetReturn()->GetValue(0)._string._str); + } else { + fprintf(stdout, "error(%d): %s\n", + req->GetErrorCode(), + req->GetErrorMessage()); + } + + fprintf(stdout, "\nTesting addFloat method\n"); + req->SubRef(); + req = supervisor.AllocRPCRequest(); + req->SetMethodName("addFloat"); + req->GetParams()->AddFloat(float1); + req->GetParams()->AddFloat(float2); + target->InvokeSync(req, 5.0); + if (req->GetErrorCode() == FRTE_NO_ERROR) { + fprintf(stdout, "%f + %f = %f\n", float1, float2, + req->GetReturn()->GetValue(0)._float); + } else { + fprintf(stdout, "error(%d): %s\n", + req->GetErrorCode(), + req->GetErrorMessage()); + } + + fprintf(stdout, "\nTesting addDouble method\n"); + req->SubRef(); + req = supervisor.AllocRPCRequest(); + req->SetMethodName("addDouble"); + req->GetParams()->AddDouble(double1); + req->GetParams()->AddDouble(double2); + target->InvokeSync(req, 5.0); + if (req->GetErrorCode() == FRTE_NO_ERROR) { + fprintf(stdout, "%f + %f = %f\n", double1, double2, + req->GetReturn()->GetValue(0)._double); + } else { + fprintf(stdout, "error(%d): %s\n", + req->GetErrorCode(), + req->GetErrorMessage()); + } + + req->SubRef(); + target->SubRef(); + supervisor.ShutDown(true); + return 0; +} + + +int +main(int argc, char **argv) +{ + RPCClient myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/frt/rpc/rpc_info.cpp b/fnet/src/examples/frt/rpc/rpc_info.cpp new file mode 100644 index 00000000000..af15a19b6a4 --- /dev/null +++ b/fnet/src/examples/frt/rpc/rpc_info.cpp @@ -0,0 +1,140 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("rpc_info"); +#include <vespa/fnet/frt/frt.h> + +class RPCInfo : public FastOS_Application +{ +public: + + void GetReq(FRT_RPCRequest **req, FRT_Supervisor *supervisor) + { + if ((*req) != NULL) + (*req)->SubRef(); + (*req) = supervisor->AllocRPCRequest(); + } + + void FreeReqs(FRT_RPCRequest *r1, FRT_RPCRequest *r2) + { + if (r1 != NULL) + r1->SubRef(); + if (r2 != NULL) + r2->SubRef(); + } + + void DumpMethodInfo(const char *indent, FRT_RPCRequest *info, + const char *name) + { + if (info->IsError()) { + printf("%sMETHOD %s\n", indent, name); + printf("%s [error(%d): %s]\n\n", indent, + info->GetErrorCode(), + info->GetErrorMessage()); + return; + } + + const char *desc = info->GetReturn()->GetValue(0)._string._str; + const char *arg = info->GetReturn()->GetValue(1)._string._str; + const char *ret = info->GetReturn()->GetValue(2)._string._str; + uint32_t argCnt = strlen(arg); + uint32_t retCnt = strlen(ret); + FRT_StringValue *argName = info->GetReturn()->GetValue(3)._string_array._pt; + FRT_StringValue *argDesc = info->GetReturn()->GetValue(4)._string_array._pt; + FRT_StringValue *retName = info->GetReturn()->GetValue(5)._string_array._pt; + FRT_StringValue *retDesc = info->GetReturn()->GetValue(6)._string_array._pt; + + printf("%sMETHOD %s\n", indent, name); + printf("%s DESCRIPTION:\n" + "%s %s\n", indent, indent, desc); + + if (argCnt > 0) { + printf("%s PARAMS:\n", indent); + for (uint32_t a = 0; a < argCnt; a++) + printf("%s [%c][%s] %s\n", indent, arg[a], argName[a]._str, + argDesc[a]._str); + } + + if (retCnt > 0) { + printf("%s RETURN:\n", indent); + for (uint32_t r = 0; r < retCnt; r++) + printf("%s [%c][%s] %s\n", indent, ret[r], retName[r]._str, + retDesc[r]._str); + } + printf("\n"); + } + + virtual int Main(); +}; + + +int +RPCInfo::Main() +{ + if (_argc < 2) { + printf("usage : rpc_info <connectspec> [verbose]\n"); + return 1; + } + + bool verbose = (_argc > 2 && strcmp(_argv[2], "verbose") == 0); + FRT_Supervisor supervisor; + FRT_Target *target = supervisor.GetTarget(_argv[1]); + FRT_RPCRequest *m_list = NULL; + FRT_RPCRequest *info = NULL; + supervisor.Start(); + + GetReq(&info, &supervisor); + info->SetMethodName("frt.rpc.ping"); + target->InvokeSync(info, 5.0); + if (info->IsError()) { + fprintf(stderr, "Error talking to %s\n", _argv[1]); + FreeReqs(m_list, info); + supervisor.ShutDown(true); + return 1; + } + + GetReq(&m_list, &supervisor); + m_list->SetMethodName("frt.rpc.getMethodList"); + target->InvokeSync(m_list, 5.0); + + if (!m_list->IsError()) { + + uint32_t numMethods = m_list->GetReturn()->GetValue(0)._string_array._len; + FRT_StringValue *methods = m_list->GetReturn()->GetValue(0)._string_array._pt; + FRT_StringValue *arglist = m_list->GetReturn()->GetValue(1)._string_array._pt; + FRT_StringValue *retlist = m_list->GetReturn()->GetValue(2)._string_array._pt; + + for (uint32_t m = 0; m < numMethods; m++) { + + if (verbose) { + + GetReq(&info, &supervisor); + info->SetMethodName("frt.rpc.getMethodInfo"); + info->GetParams()->AddString(methods[m]._str); + target->InvokeSync(info, 5.0); + DumpMethodInfo("", info, methods[m]._str); + + } else { + + printf("METHOD [%s] <- %s <- [%s]\n", + retlist[m]._str, methods[m]._str, arglist[m]._str); + } + } + } else { + fprintf(stderr, " [error(%d): %s]\n", + m_list->GetErrorCode(), + m_list->GetErrorMessage()); + } + FreeReqs(m_list, info); + target->SubRef(); + supervisor.ShutDown(true); + return 0; +} + + +int +main(int argc, char **argv) +{ + RPCInfo myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/frt/rpc/rpc_invoke.cpp b/fnet/src/examples/frt/rpc/rpc_invoke.cpp new file mode 100644 index 00000000000..471fd9a879f --- /dev/null +++ b/fnet/src/examples/frt/rpc/rpc_invoke.cpp @@ -0,0 +1,107 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("rpc_invoke"); +#include <vespa/fnet/frt/frt.h> + +class RPCClient : public FastOS_Application +{ +private: + static bool addArg(FRT_RPCRequest *req, const char *param) { + int len = strlen(param); + if (len < 2 || param[1] != ':') { + return false; + } + const char *value = param + 2; + switch (param[0]) { + case 'b': + req->GetParams()->AddInt8(strtoll(value, NULL, 0)); + break; + case 'h': + req->GetParams()->AddInt16(strtoll(value, NULL, 0)); + break; + case 'i': + req->GetParams()->AddInt32(strtoll(value, NULL, 0)); + break; + case 'l': + req->GetParams()->AddInt64(strtoll(value, NULL, 0)); + break; + case 'f': + req->GetParams()->AddFloat(strtod(value, NULL)); + break; + case 'd': + req->GetParams()->AddDouble(strtod(value, NULL)); + break; + case 's': + req->GetParams()->AddString(value); + break; + default: + return false; + } + return true; + } + +public: + virtual int Main(); +}; + +int +RPCClient::Main() +{ + if (_argc < 3) { + fprintf(stderr, "usage: rpc_invoke [-t timeout] <connectspec> <method> [args]\n"); + fprintf(stderr, " -t timeout in seconds\n"); + fprintf(stderr, " Each arg must be on the form <type>:<value>\n"); + fprintf(stderr, " supported types: {'b','h','i','l','f','d','s'}\n"); + return 1; + } + int retCode = 0; + FRT_Supervisor supervisor; + supervisor.Start(); + int targetArg = 1; + int methNameArg = 2; + int startOfArgs = 3; + int timeOut = 10; + if (strcmp(_argv[1], "-t") == 0) { + timeOut = atoi(_argv[2]); + targetArg = 3; + methNameArg = 4; + startOfArgs = 5; + } + FRT_Target *target = supervisor.GetTarget(_argv[targetArg]); + FRT_RPCRequest *req = supervisor.AllocRPCRequest(); + req->SetMethodName(_argv[methNameArg]); + for (int i = startOfArgs; i < _argc; ++i) { + if (!addArg(req, _argv[i])) { + fprintf(stderr, "could not parse parameter: '%s'\n", _argv[i]); + retCode = 2; + break; + } + } + if (retCode == 0) { + fprintf(stdout, "PARAMETERS:\n"); + req->GetParams()->Print(); + target->InvokeSync(req, (double)timeOut); + if (req->GetErrorCode() == FRTE_NO_ERROR) { + fprintf(stdout, "RETURN VALUES:\n"); + req->GetReturn()->Print(); + } else { + fprintf(stderr, "error(%d): %s\n", + req->GetErrorCode(), + req->GetErrorMessage()); + retCode = 3; + } + } + req->SubRef(); + target->SubRef(); + supervisor.ShutDown(true); + return retCode; +} + + +int +main(int argc, char **argv) +{ + RPCClient myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/frt/rpc/rpc_proxy.cpp b/fnet/src/examples/frt/rpc/rpc_proxy.cpp new file mode 100644 index 00000000000..dd29255093a --- /dev/null +++ b/fnet/src/examples/frt/rpc/rpc_proxy.cpp @@ -0,0 +1,254 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("rpc_proxy"); +#include <vespa/fnet/frt/frt.h> + +//----------------------------------------------------------------------------- + +struct Session +{ + FNET_Connection *client; + FRT_Target *server; + uint32_t id; + uint32_t finiCnt; + + Session(uint32_t xid) : client(NULL), server(NULL), id(xid), finiCnt(0) {} + ~Session() { assert(client == NULL && server == NULL && finiCnt == 2); } + +private: + Session(const Session &); + Session &operator=(const Session &); +}; + +//----------------------------------------------------------------------------- + +class RPCProxy : public FRT_Invokable +{ +private: + FRT_Supervisor &_supervisor; + const char *_spec; + bool _verbose; + uint32_t _currID; + char _prefixStr[256]; + + RPCProxy(const RPCProxy &); + RPCProxy &operator=(const RPCProxy &); + +public: + RPCProxy(FRT_Supervisor &supervisor, + const char *spec, + bool verbose) + : _supervisor(supervisor), + _spec(spec), + _verbose(verbose), + _currID(0), + _prefixStr() {} + + bool IsVerbose() const { return _verbose; } + const char *GetPrefix(FRT_RPCRequest *req); + void PrintMethod(FRT_RPCRequest *req, const char *desc); + void Done(FRT_RPCRequest *req); + void HOOK_Mismatch(FRT_RPCRequest *req); + void HOOK_Init(FRT_RPCRequest *req); + void HOOK_Down(FRT_RPCRequest *req); + void HOOK_Fini(FRT_RPCRequest *req); + static Session *GetSession(FRT_RPCRequest *req) + { + return (Session *) req->GetConnection()->GetContext()._value.VOIDP; + } +}; + +//----------------------------------------------------------------------------- + +class ReqDone : public FRT_IRequestWait +{ +private: + RPCProxy &_proxy; + +public: + ReqDone(RPCProxy &proxy) : _proxy(proxy) {} + virtual void RequestDone(FRT_RPCRequest *req); +}; + +void +ReqDone::RequestDone(FRT_RPCRequest *req) +{ + _proxy.Done(req); +} + +//----------------------------------------------------------------------------- + +const char * +RPCProxy::GetPrefix(FRT_RPCRequest *req) +{ + FastOS_Time t; + tm currTime; + tm *currTimePt; + + t.SetNow(); + time_t secs = t.Secs(); + currTimePt = localtime_r(&secs, &currTime); + assert(currTimePt == &currTime); + (void) currTimePt; + + char rid[32]; + if (req->GetContext()._value.CHANNEL != NULL) { + sprintf(rid, "[rid=%u]", req->GetContext()._value.CHANNEL->GetID()); + } else { + rid[0] = '\0'; + } + + sprintf(_prefixStr, "[%04d-%02d-%02d %02d:%02d:%02d:%03d][sid=%u]%s", + currTime.tm_year + 1900, + currTime.tm_mon + 1, + currTime.tm_mday, + currTime.tm_hour, + currTime.tm_min, + currTime.tm_sec, + (int)(t.GetMicroSeconds() / 1000), + GetSession(req)->id, + rid); + + return _prefixStr; +} + + +void +RPCProxy::PrintMethod(FRT_RPCRequest *req, const char *desc) +{ + fprintf(stdout, "%s %s: %s\n", GetPrefix(req), desc, + req->GetMethodName()); +} + + +void +RPCProxy::Done(FRT_RPCRequest *req) +{ + PrintMethod(req, "RETURN"); + if (IsVerbose()) { + req->GetReturn()->Print(8); + } + req->Return(); +} + + +void +RPCProxy::HOOK_Mismatch(FRT_RPCRequest *req) +{ + PrintMethod(req, "INVOKE"); + if (IsVerbose()) { + req->GetParams()->Print(8); + } + req->Detach(); + req->SetError(FRTE_NO_ERROR, ""); + if (req->GetConnection()->IsServer() && + GetSession(req)->server != NULL) + { + GetSession(req)->server->InvokeAsync(req, 60.0, + new (req->GetMemoryTub()) + ReqDone(*this)); + } else if (req->GetConnection()->IsClient() && + GetSession(req)->client != NULL) + { + FRT_Supervisor::InvokeAsync(GetSession(req)->client->Owner(), + GetSession(req)->client, + req, 60.0, + new (req->GetMemoryTub()) + ReqDone(*this)); + } else { + req->SetError(FRTE_RPC_CONNECTION); + req->Return(); + } +} + + +void +RPCProxy::HOOK_Init(FRT_RPCRequest *req) +{ + if (req->GetConnection()->IsClient()) { + return; + } + Session *session = new Session(_currID++); + session->client = req->GetConnection(); + session->server = + _supervisor.Get2WayTarget(_spec, + FNET_Context((void *) session)); + session->client->SetContext(FNET_Context((void *) session)); + if (session->server->GetConnection() == NULL || + session->server->GetConnection()->GetState() + > FNET_Connection::FNET_CONNECTED) + { + session->finiCnt = 1; + session->client->Owner()->Close(session->client); + } + fprintf(stdout, "%s INIT\n", GetPrefix(req)); +} + + +void +RPCProxy::HOOK_Down(FRT_RPCRequest *req) +{ + Session *session = GetSession(req); + if (req->GetConnection()->IsClient()) { + if (session->client != NULL) { + session->client->Owner()->Close(session->client); + } + } else { + session->server->SubRef(); + session->client = NULL; + session->server = NULL; + } +} + + +void +RPCProxy::HOOK_Fini(FRT_RPCRequest *req) +{ + if (++GetSession(req)->finiCnt == 2) { + fprintf(stdout, "%s FINI\n", GetPrefix(req)); + delete GetSession(req); + } +} + +//----------------------------------------------------------------------------- + +class App : public FastOS_Application +{ +public: + virtual int Main(); +}; + +int +App::Main() +{ + FNET_SignalShutDown::hookSignals(); + // would like to turn off FNET logging somehow + if (_argc < 3) { + fprintf(stderr, "usage: %s <listenspec> <connectspec> [verbose]\n", _argv[0]); + return 1; + } + bool verbose = (_argc > 3) && (strcmp(_argv[3], "verbose") == 0); + + FRT_Supervisor supervisor; + RPCProxy proxy(supervisor, _argv[2], verbose); + + supervisor.GetReflectionManager()->Reset(); + supervisor.SetSessionInitHook(FRT_METHOD(RPCProxy::HOOK_Init), &proxy); + supervisor.SetSessionDownHook(FRT_METHOD(RPCProxy::HOOK_Down), &proxy); + supervisor.SetSessionFiniHook(FRT_METHOD(RPCProxy::HOOK_Fini), &proxy); + supervisor.SetMethodMismatchHook(FRT_METHOD(RPCProxy::HOOK_Mismatch), + &proxy); + supervisor.Listen(_argv[1]); + FNET_SignalShutDown ssd(*supervisor.GetTransport()); + supervisor.Main(); + return 0; +} + + +int +main(int argc, char **argv) +{ + App app; + return app.Entry(argc, argv); +} diff --git a/fnet/src/examples/frt/rpc/rpc_server.cpp b/fnet/src/examples/frt/rpc/rpc_server.cpp new file mode 100644 index 00000000000..5e8dd744fdc --- /dev/null +++ b/fnet/src/examples/frt/rpc/rpc_server.cpp @@ -0,0 +1,125 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("rpc_server"); +#include <vespa/fnet/frt/frt.h> + + +class RPCServer : public FRT_Invokable +{ +private: + FRT_Supervisor *_supervisor; + + RPCServer(const RPCServer &); + RPCServer &operator=(const RPCServer &); + +public: + RPCServer() : _supervisor(NULL) {} + void InitRPC(FRT_Supervisor *s); + void RPC_concat(FRT_RPCRequest *req); + void RPC_addFloat(FRT_RPCRequest *req); + void RPC_addDouble(FRT_RPCRequest *req); + int Main(int argc, char **argv); +}; + +void +RPCServer::InitRPC(FRT_Supervisor *s) +{ + FRT_ReflectionBuilder rb(s); + //------------------------------------------------------------------- + rb.DefineMethod("concat", "ss", "s", true, + FRT_METHOD(RPCServer::RPC_concat), this); + rb.MethodDesc("Concatenate two strings"); + rb.ParamDesc("string1", "a string"); + rb.ParamDesc("string2", "another string"); + rb.ReturnDesc("ret", "the concatenation of string1 and string2"); + //------------------------------------------------------------------- + rb.DefineMethod("addFloat", "ff", "f", true, + FRT_METHOD(RPCServer::RPC_addFloat), this); + rb.MethodDesc("Add two floats"); + rb.ParamDesc("float1", "a float"); + rb.ParamDesc("float2", "another float"); + rb.ReturnDesc("ret", "float1 + float2"); + //------------------------------------------------------------------- + rb.DefineMethod("addDouble", "dd", "d", true, + FRT_METHOD(RPCServer::RPC_addDouble), this); + rb.MethodDesc("Add two doubles"); + rb.ParamDesc("double1", "a double"); + rb.ParamDesc("double2", "another double"); + rb.ReturnDesc("ret", "double1 + double2"); + //------------------------------------------------------------------- +} + +void +RPCServer::RPC_concat(FRT_RPCRequest *req) +{ + FRT_Values ¶ms = *req->GetParams(); + FRT_Values &ret = *req->GetReturn(); + + uint32_t len = (params[0]._string._len + + params[1]._string._len); + char *tmp = ret.AddString(len); + strcpy(tmp, params[0]._string._str); + strcat(tmp, params[1]._string._str); +} + +void +RPCServer::RPC_addFloat(FRT_RPCRequest *req) +{ + FRT_Values ¶ms = *req->GetParams(); + FRT_Values &ret = *req->GetReturn(); + + ret.AddFloat(params[0]._float + params[1]._float); +} + +void +RPCServer::RPC_addDouble(FRT_RPCRequest *req) +{ + FRT_Values ¶ms = *req->GetParams(); + FRT_Values &ret = *req->GetReturn(); + + ret.AddDouble(params[0]._double + params[1]._double); +} + +int +RPCServer::Main(int argc, char **argv) +{ + FNET_SignalShutDown::hookSignals(); + if (argc < 2) { + printf("usage : rpc_server <listenspec>\n"); + return 1; + } + + _supervisor = new FRT_Supervisor(); + InitRPC(_supervisor); + _supervisor->Listen(argv[1]); + FNET_SignalShutDown ssd(*_supervisor->GetTransport()); + _supervisor->Main(); + delete _supervisor; + return 0; +} + + +class App : public FastOS_Application +{ +private: + RPCServer _server; + +public: + App() : _server() {} + virtual int Main(); +}; + +int +App::Main() +{ + return _server.Main(_argc, _argv); +} + + +int +main(int argc, char **argv) +{ + App myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/ping/.gitignore b/fnet/src/examples/ping/.gitignore new file mode 100644 index 00000000000..96bd86822ad --- /dev/null +++ b/fnet/src/examples/ping/.gitignore @@ -0,0 +1,9 @@ +*.core +*.ilk +*.pdb +.depend +Makefile +pingclient +pingserver +fnet_pingclient_app +fnet_pingserver_app diff --git a/fnet/src/examples/ping/CMakeLists.txt b/fnet/src/examples/ping/CMakeLists.txt new file mode 100644 index 00000000000..7354ec9cf48 --- /dev/null +++ b/fnet/src/examples/ping/CMakeLists.txt @@ -0,0 +1,17 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(fnet_pingserver_app + SOURCES + packets.cpp + pingserver.cpp + INSTALL bin + DEPENDS + fnet +) +vespa_add_executable(fnet_pingclient_app + SOURCES + packets.cpp + pingclient.cpp + INSTALL bin + DEPENDS + fnet +) diff --git a/fnet/src/examples/ping/packets.cpp b/fnet/src/examples/ping/packets.cpp new file mode 100644 index 00000000000..8958666b374 --- /dev/null +++ b/fnet/src/examples/ping/packets.cpp @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/fnet/fnet.h> +#include "packets.h" + +uint32_t +PingRequest::GetPCODE() +{ + return PCODE_PING_REQUEST; +} + +uint32_t +PingRequest::GetLength() +{ + return 0; +} + +void +PingRequest::Encode(FNET_DataBuffer *) +{ +} + +bool +PingRequest::Decode(FNET_DataBuffer *src, uint32_t len) +{ + src->DataToDead(len); + return (len == 0); +} + + +uint32_t +PingReply::GetPCODE() +{ + return PCODE_PING_REPLY; +} + +uint32_t +PingReply::GetLength() +{ + return 0; +} + +void +PingReply::Encode(FNET_DataBuffer *) +{ +} + +bool +PingReply::Decode(FNET_DataBuffer *src, uint32_t len) +{ + src->DataToDead(len); + return (len == 0); +} + + +FNET_Packet * +PingPacketFactory::CreatePacket(uint32_t pcode, FNET_Context) +{ + switch(pcode) { + case PCODE_PING_REQUEST: return new PingRequest(); + case PCODE_PING_REPLY: return new PingReply(); + } + return NULL; +} diff --git a/fnet/src/examples/ping/packets.h b/fnet/src/examples/ping/packets.h new file mode 100644 index 00000000000..39dae75a16f --- /dev/null +++ b/fnet/src/examples/ping/packets.h @@ -0,0 +1,36 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + + +enum { + PCODE_PING_REQUEST = 1, + PCODE_PING_REPLY = 2 +}; + + +class PingRequest : public FNET_Packet +{ +public: + virtual uint32_t GetPCODE(); + virtual uint32_t GetLength(); + virtual void Encode(FNET_DataBuffer *); + virtual bool Decode(FNET_DataBuffer *src, uint32_t len); +}; + + +class PingReply : public FNET_Packet +{ +public: + virtual uint32_t GetPCODE(); + virtual uint32_t GetLength(); + virtual void Encode(FNET_DataBuffer *); + virtual bool Decode(FNET_DataBuffer *src, uint32_t len); +}; + + +class PingPacketFactory : public FNET_IPacketFactory +{ +public: + virtual FNET_Packet *CreatePacket(uint32_t pcode, FNET_Context); +}; + diff --git a/fnet/src/examples/ping/pingclient.cpp b/fnet/src/examples/ping/pingclient.cpp new file mode 100644 index 00000000000..075c0b5df32 --- /dev/null +++ b/fnet/src/examples/ping/pingclient.cpp @@ -0,0 +1,93 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("pingclient"); +#include <vespa/fnet/fnet.h> +#include <examples/ping/packets.h> + + +class PingClient : public FastOS_Application +{ +public: + int Main(); +}; + + +int +PingClient::Main() +{ + if (_argc < 2) { + printf("usage : pingclient <connectspec>\n"); + printf("example: pingclient 'tcp/localhost:8000'\n"); + return 1; + } + + FNET_PacketQueue queue; + FastOS_ThreadPool pool(65000); + PingPacketFactory factory; + FNET_SimplePacketStreamer streamer(&factory); + FNET_Transport transport; + FNET_Connection *conn = transport.Connect(_argv[1], &streamer); + FNET_Channel *channels[10]; + transport.Start(&pool); + + uint32_t channelCnt = 0; + for (uint32_t i = 0; i < 10; i++) { + channels[i] = (conn == NULL) ? NULL : conn->OpenChannel(&queue, FNET_Context(i)); + if (channels[i] == 0) { + fprintf(stderr, "Could not make channel[%d] to %s\n", i, _argv[1]); + break; + } + channelCnt++; + channels[i]->Send(new PingRequest()); + channels[i]->Sync(); + fprintf(stderr, "Sent ping in context %d\n", i); + } + + FNET_Packet *packet; + FNET_Context context; + while (channelCnt > 0) { + packet = queue.DequeuePacket(5000, &context); + if (packet == NULL) { + fprintf(stderr, "Timeout\n"); + for(int c = 0; c < 10; c++) { + if (channels[c] != NULL) { + channels[c]->Close(); + channels[c]->Free(); + channels[c] = NULL; + fprintf(stderr, "Closed channel with context %d\n", c); + } + } + break; + } + if (packet->GetPCODE() == PCODE_PING_REPLY) { + fprintf(stderr, "Got ping result in context %d\n", + context._value.INT); + } else if (packet->IsChannelLostCMD()) { + fprintf(stderr, "Lost channel with context %d\n", + context._value.INT); + } + if (channels[context._value.INT] != NULL) { + channels[context._value.INT]->Close(); + channels[context._value.INT]->Free(); + channels[context._value.INT] = NULL; + fprintf(stderr, "Closed channel with context %d\n", + context._value.INT); + channelCnt--; + } + packet->Free(); + } + if (conn != NULL) + conn->SubRef(); + transport.ShutDown(true); + pool.Close(); + return 0; +} + + +int +main(int argc, char **argv) +{ + PingClient myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/ping/pingserver.cpp b/fnet/src/examples/ping/pingserver.cpp new file mode 100644 index 00000000000..346307a017c --- /dev/null +++ b/fnet/src/examples/ping/pingserver.cpp @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("pingserver"); +#include <vespa/fnet/fnet.h> +#include <examples/ping/packets.h> + + +class PingServer : public FNET_IServerAdapter, + public FNET_IPacketHandler, + public FastOS_Application +{ +public: + bool InitAdminChannel(FNET_Channel *) { return false; } + bool InitChannel(FNET_Channel *channel, uint32_t) + { + channel->SetContext(FNET_Context(channel)); + channel->SetHandler(this); + return true; + } + + HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context context) + { + if (packet->GetPCODE() == PCODE_PING_REQUEST) { + fprintf(stderr, "Got ping request, sending ping reply\n"); + context._value.CHANNEL->Send(new PingReply()); + } + packet->Free(); + return FNET_FREE_CHANNEL; + } + + int Main(); +}; + + +int +PingServer::Main() +{ + FNET_SignalShutDown::hookSignals(); + if (_argc < 2) { + printf("usage : pingserver <listenspec>\n"); + printf("example: pingserver 'tcp/8000'\n"); + return 1; + } + + FNET_Transport transport; + PingPacketFactory factory; + FNET_SimplePacketStreamer streamer(&factory); + FNET_Connector *listener = + transport.Listen(_argv[1], &streamer, this); + if (listener != NULL) + listener->SubRef(); + + FNET_SignalShutDown ssd(transport); + transport.Main(); + return 0; +} + + +int +main(int argc, char **argv) +{ + PingServer myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/proxy/.gitignore b/fnet/src/examples/proxy/.gitignore new file mode 100644 index 00000000000..a2ec3289ec2 --- /dev/null +++ b/fnet/src/examples/proxy/.gitignore @@ -0,0 +1,4 @@ +.depend +Makefile +proxy +fnet_proxy_app diff --git a/fnet/src/examples/proxy/CMakeLists.txt b/fnet/src/examples/proxy/CMakeLists.txt new file mode 100644 index 00000000000..bc5dce755a7 --- /dev/null +++ b/fnet/src/examples/proxy/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(fnet_proxy_app + SOURCES + proxy.cpp + INSTALL bin + DEPENDS + fnet +) diff --git a/fnet/src/examples/proxy/proxy.cpp b/fnet/src/examples/proxy/proxy.cpp new file mode 100644 index 00000000000..9a76077591f --- /dev/null +++ b/fnet/src/examples/proxy/proxy.cpp @@ -0,0 +1,244 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("proxy"); +#include <vespa/fnet/fnet.h> + + +class RawPacket : public FNET_Packet +{ +private: + FNET_DataBuffer _data; + +public: + RawPacket() : _data() {} + virtual uint32_t GetPCODE(); + virtual uint32_t GetLength(); + virtual void Encode(FNET_DataBuffer *); + virtual bool Decode(FNET_DataBuffer *src, uint32_t len); +}; + +uint32_t +RawPacket::GetPCODE() +{ + return 0; +} + +uint32_t +RawPacket::GetLength() +{ + return _data.GetDataLen(); +} + +void +RawPacket::Encode(FNET_DataBuffer *dst) +{ + dst->WriteBytes(_data.GetData(), _data.GetDataLen()); +} + +bool +RawPacket::Decode(FNET_DataBuffer *src, uint32_t len) +{ + _data.WriteBytes(src->GetData(), len); + src->DataToDead(len); + return true; +} + + +class Bridge : public FNET_IPacketHandler +{ +private: + FNET_Channel *_client; + FNET_Connection *_server; + + Bridge(const Bridge &); + Bridge &operator=(const Bridge &); + +public: + Bridge() : _client(NULL), _server(NULL) {} + + enum packet_source { + CLIENT = 0, + SERVER = 1 + }; + + void SetConns(FNET_Channel *client, + FNET_Connection *server) + { + _client = client; + _server = server; + } + + virtual HP_RetCode HandlePacket(FNET_Packet *packet, + FNET_Context context); +}; + + +FNET_IPacketHandler::HP_RetCode +Bridge::HandlePacket(FNET_Packet *packet, FNET_Context context) +{ + HP_RetCode ret = FNET_KEEP_CHANNEL; + + if (packet->IsChannelLostCMD()) { + + if (context._value.INT == CLIENT) { + + if (_server != NULL) { + LOG(info, "client connection lost"); + _server->Owner()->Close(_server); + } + ret = FNET_FREE_CHANNEL; + _client = NULL; + + } else if (context._value.INT == SERVER) { + + if (_client != NULL) { + LOG(info, "server connection lost"); + _client->GetConnection()->Owner()->Close(_client->GetConnection()); + } + _server->SubRef(); + _server = NULL; + + } + + if (_client == NULL && _server == NULL) + delete this; + + } else { + + if (context._value.INT == CLIENT) { + if (_server != NULL) + _server->PostPacket(packet, FNET_NOID); + else + packet->Free(); + + } else if (context._value.INT == SERVER) { + if (_client != NULL) + _client->Send(packet); + else + packet->Free(); + + } + } + + // The admin channel on a client connection (in this case, the + // connection with the server) are freed when the connection + // object is destructed. The admin channel on a server connection + // however (in this case the channel connecting us with the + // client) must be treated as a normal channel. + + return ret; +} + + +class Proxy : public FNET_IServerAdapter, + public FNET_IPacketStreamer, + public FastOS_Application +{ +private: + FNET_Transport _transport; + +public: + Proxy() : _transport() {} + virtual bool GetPacketInfo(FNET_DataBuffer *src, uint32_t *plen, + uint32_t *pcode, uint32_t *chid, bool *); + virtual FNET_Packet *Decode(FNET_DataBuffer *src, uint32_t plen, + uint32_t pcode, FNET_Context); + virtual void Encode(FNET_Packet *packet, uint32_t chid, + FNET_DataBuffer *dst); + // --------------------------------------------- + virtual bool InitAdminChannel(FNET_Channel *channel); + virtual bool InitChannel(FNET_Channel *, uint32_t); + // --------------------------------------------- + virtual int Main(); +}; + + +bool +Proxy::GetPacketInfo(FNET_DataBuffer *src, uint32_t *plen, + uint32_t *pcode, uint32_t *chid, bool *) +{ + if (src->GetDataLen() == 0) { + return false; + } + *pcode = 0; + *plen = src->GetDataLen(); + *chid = FNET_NOID; + return true; +} + +FNET_Packet * +Proxy::Decode(FNET_DataBuffer *src, uint32_t plen, + uint32_t, FNET_Context) +{ + RawPacket *packet = new RawPacket(); + packet->Decode(src, plen); + return packet; +} + +void +Proxy::Encode(FNET_Packet *packet, uint32_t chid, + FNET_DataBuffer *dst) +{ + uint32_t pcode = packet->GetPCODE(); + uint32_t len = packet->GetLength(); + (void) pcode; + (void) chid; + (void) len; + packet->Encode(dst); +} + +// --------------------------------------------- + +bool +Proxy::InitAdminChannel(FNET_Channel *channel) +{ + Bridge *bridge = new Bridge(); + FNET_Connection *server = _transport.Connect(_argv[2], this, bridge, + FNET_Context(Bridge::SERVER)); + if (server == NULL) { + channel->GetConnection()->Owner()->Close(channel->GetConnection()); + delete bridge; + return false; + } + bridge->SetConns(channel, server); + channel->SetHandler(bridge); + channel->SetContext(FNET_Context((uint32_t)Bridge::CLIENT)); + return true; +} + +bool +Proxy::InitChannel(FNET_Channel *, uint32_t) +{ + return false; +} + +// --------------------------------------------- + +int +Proxy::Main() +{ + FNET_SignalShutDown::hookSignals(); + if (_argc != 3) { + fprintf(stderr, "usage: %s <listen spec> <target spec>\n", _argv[0]); + return 1; + } + + FNET_Connector *listener = + _transport.Listen(_argv[1], this, this); + if (listener != NULL) + listener->SubRef(); + + _transport.SetLogStats(true); + FNET_SignalShutDown ssd(_transport); + _transport.Main(); + return 0; +} + + +int +main(int argc, char **argv) +{ + Proxy myapp; + return myapp.Entry(argc, argv); +} diff --git a/fnet/src/examples/test/.gitignore b/fnet/src/examples/test/.gitignore new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/fnet/src/examples/test/.gitignore diff --git a/fnet/src/examples/timeout/.gitignore b/fnet/src/examples/timeout/.gitignore new file mode 100644 index 00000000000..017989a129b --- /dev/null +++ b/fnet/src/examples/timeout/.gitignore @@ -0,0 +1,4 @@ +.depend +Makefile +timeout +fnet_timeout_app diff --git a/fnet/src/examples/timeout/CMakeLists.txt b/fnet/src/examples/timeout/CMakeLists.txt new file mode 100644 index 00000000000..5b9514ab0c8 --- /dev/null +++ b/fnet/src/examples/timeout/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(fnet_timeout_app + SOURCES + timeout.cpp + INSTALL bin + DEPENDS + fnet +) diff --git a/fnet/src/examples/timeout/timeout.cpp b/fnet/src/examples/timeout/timeout.cpp new file mode 100644 index 00000000000..39bfb96eb0f --- /dev/null +++ b/fnet/src/examples/timeout/timeout.cpp @@ -0,0 +1,93 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP("timeout"); +#include <vespa/fnet/fnet.h> + + +class Timeout : public FNET_Task +{ +private: + FNET_PacketQueue *_queue; + + Timeout(const Timeout &); + Timeout &operator=(const Timeout &); + +public: + Timeout(FNET_Scheduler *scheduler, + FNET_PacketQueue *queue) + : FNET_Task(scheduler), + _queue(queue) + {} + + virtual void PerformTask(); +}; + + +void +Timeout::PerformTask() +{ + _queue->QueuePacket(&FNET_ControlPacket::Timeout, FNET_Context()); +} + + +class MyApp : public FastOS_Application +{ +public: + int Main(); +}; + + +int +MyApp::Main() +{ + double ms; + FastOS_Time t; + FNET_PacketQueue queue; + FastOS_ThreadPool pool(65000); + FNET_Transport transport; + Timeout timeout(transport.GetScheduler(), &queue); + transport.Start(&pool); + + // stable-state operation + FastOS_Thread::Sleep(500); + + FNET_Packet *packet; + FNET_Context context; + + fprintf(stderr, "scheduling timeout in 2 seconds...\n"); + t.SetNow(); + timeout.Schedule(2.0); // timeout in 2 seconds + + FastOS_Thread::Sleep(1000); + + timeout.Unschedule(); // cancel timeout + ms = t.MilliSecsToNow(); + + if (queue.GetPacketCnt_NoLock() == 0) + fprintf(stderr, "timeout canceled; no timeout packet delivered\n"); + fprintf(stderr, "time since timeout was scheduled: %f ms\n", ms); + + fprintf(stderr, "scheduling timeout in 2 seconds...\n"); + t.SetNow(); + timeout.Schedule(2.0); // timeout in 2 seconds + + packet = queue.DequeuePacket(&context); // wait for timeout + ms = t.MilliSecsToNow(); + + if (packet->IsTimeoutCMD()) + fprintf(stderr, "got timeout packet\n"); + fprintf(stderr, "time since timeout was scheduled: %f ms\n", ms); + + transport.ShutDown(true); + pool.Close(); + return 0; +} + + +int +main(int argc, char **argv) +{ + MyApp myapp; + return myapp.Entry(argc, argv); +} |