From bdfd0e49219fc4603a4946ad99b6336864998e71 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 13 Jan 2017 00:02:55 +0100 Subject: replace memorytub with Stash. --- fnet/CMakeLists.txt | 1 - fnet/src/examples/frt/rpc/rpc_proxy.cpp | 4 +- fnet/src/testlist.txt | 1 - fnet/src/tests/frt/memorytub/.gitignore | 6 - fnet/src/tests/frt/memorytub/CMakeLists.txt | 8 -- fnet/src/tests/frt/memorytub/DESC | 1 - fnet/src/tests/frt/memorytub/FILES | 1 - fnet/src/tests/frt/memorytub/memorytub.cpp | 124 -------------------- fnet/src/tests/frt/rpc/invoke.cpp | 11 +- fnet/src/tests/frt/rpc/sharedblob.cpp | 7 +- fnet/src/tests/frt/values/values_test.cpp | 16 ++- fnet/src/tests/printstuff/printstuff_test.cpp | 12 +- fnet/src/tests/regress/frt/memorytub/.gitignore | 0 fnet/src/vespa/fnet/CMakeLists.txt | 1 + fnet/src/vespa/fnet/channel.h | 2 - fnet/src/vespa/fnet/connection.cpp | 49 +++++++- fnet/src/vespa/fnet/connection.h | 46 +++----- fnet/src/vespa/fnet/context.cpp | 12 ++ fnet/src/vespa/fnet/context.h | 14 +-- fnet/src/vespa/fnet/dummypacket.h | 2 + fnet/src/vespa/fnet/frt/CMakeLists.txt | 1 - fnet/src/vespa/fnet/frt/frt.h | 2 - fnet/src/vespa/fnet/frt/invokable.h | 2 + fnet/src/vespa/fnet/frt/invoker.cpp | 49 +++++++- fnet/src/vespa/fnet/frt/invoker.h | 53 +++------ fnet/src/vespa/fnet/frt/isharedblob.h | 1 + fnet/src/vespa/fnet/frt/memorytub.cpp | 42 ------- fnet/src/vespa/fnet/frt/memorytub.h | 149 ------------------------ fnet/src/vespa/fnet/frt/packets.cpp | 2 +- fnet/src/vespa/fnet/frt/reflection.cpp | 85 ++++++++++---- fnet/src/vespa/fnet/frt/reflection.h | 49 ++------ fnet/src/vespa/fnet/frt/rpcrequest.cpp | 65 +++++++++-- fnet/src/vespa/fnet/frt/rpcrequest.h | 65 +++-------- fnet/src/vespa/fnet/frt/supervisor.cpp | 95 ++++++++------- fnet/src/vespa/fnet/frt/supervisor.h | 22 +++- fnet/src/vespa/fnet/frt/values.cpp | 82 ++++++++----- fnet/src/vespa/fnet/frt/values.h | 25 ++-- fnet/src/vespa/fnet/iocomponent.cpp | 16 ++- fnet/src/vespa/fnet/iocomponent.h | 20 +++- fnet/src/vespa/fnet/ipacketfactory.h | 5 +- fnet/src/vespa/fnet/packetqueue.h | 8 +- fnet/src/vespa/fnet/scheduler.cpp | 112 +++++++++++++++++- fnet/src/vespa/fnet/scheduler.h | 142 +++------------------- fnet/src/vespa/fnet/simplepacketstreamer.h | 2 + fnet/src/vespa/fnet/stats.h | 2 + fnet/src/vespa/fnet/transport.cpp | 58 ++++++++- fnet/src/vespa/fnet/transport.h | 64 ++++------ fnet/src/vespa/fnet/transport_thread.cpp | 13 ++- fnet/src/vespa/fnet/transport_thread.h | 13 ++- 49 files changed, 718 insertions(+), 844 deletions(-) delete mode 100644 fnet/src/tests/frt/memorytub/.gitignore delete mode 100644 fnet/src/tests/frt/memorytub/CMakeLists.txt delete mode 100644 fnet/src/tests/frt/memorytub/DESC delete mode 100644 fnet/src/tests/frt/memorytub/FILES delete mode 100644 fnet/src/tests/frt/memorytub/memorytub.cpp delete mode 100644 fnet/src/tests/regress/frt/memorytub/.gitignore create mode 100644 fnet/src/vespa/fnet/context.cpp delete mode 100644 fnet/src/vespa/fnet/frt/memorytub.cpp delete mode 100644 fnet/src/vespa/fnet/frt/memorytub.h (limited to 'fnet') diff --git a/fnet/CMakeLists.txt b/fnet/CMakeLists.txt index edf99050c43..b55d96ceaa4 100644 --- a/fnet/CMakeLists.txt +++ b/fnet/CMakeLists.txt @@ -19,7 +19,6 @@ vespa_define_module( src/tests/databuffer src/tests/examples src/tests/fdselector - src/tests/frt/memorytub src/tests/frt/method_pt src/tests/frt/parallel_rpc src/tests/frt/rpc diff --git a/fnet/src/examples/frt/rpc/rpc_proxy.cpp b/fnet/src/examples/frt/rpc/rpc_proxy.cpp index dd29255093a..9f6ae37ab2d 100644 --- a/fnet/src/examples/frt/rpc/rpc_proxy.cpp +++ b/fnet/src/examples/frt/rpc/rpc_proxy.cpp @@ -146,7 +146,7 @@ RPCProxy::HOOK_Mismatch(FRT_RPCRequest *req) GetSession(req)->server != NULL) { GetSession(req)->server->InvokeAsync(req, 60.0, - new (req->GetMemoryTub()) + new (req->getStash()) ReqDone(*this)); } else if (req->GetConnection()->IsClient() && GetSession(req)->client != NULL) @@ -154,7 +154,7 @@ RPCProxy::HOOK_Mismatch(FRT_RPCRequest *req) FRT_Supervisor::InvokeAsync(GetSession(req)->client->Owner(), GetSession(req)->client, req, 60.0, - new (req->GetMemoryTub()) + new (req->getStash()) ReqDone(*this)); } else { req->SetError(FRTE_RPC_CONNECTION); diff --git a/fnet/src/testlist.txt b/fnet/src/testlist.txt index 5932feee656..4aaef3ebfcb 100644 --- a/fnet/src/testlist.txt +++ b/fnet/src/testlist.txt @@ -3,7 +3,6 @@ tests/connection_spread tests/databuffer tests/examples tests/fdselector -tests/frt/memorytub tests/frt/method_pt tests/frt/parallel_rpc tests/frt/rpc diff --git a/fnet/src/tests/frt/memorytub/.gitignore b/fnet/src/tests/frt/memorytub/.gitignore deleted file mode 100644 index e61f6585695..00000000000 --- a/fnet/src/tests/frt/memorytub/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -*.core -.depend -Makefile -core -memorytub_test -fnet_memorytub_test_app diff --git a/fnet/src/tests/frt/memorytub/CMakeLists.txt b/fnet/src/tests/frt/memorytub/CMakeLists.txt deleted file mode 100644 index e33dc154212..00000000000 --- a/fnet/src/tests/frt/memorytub/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(fnet_memorytub_test_app TEST - SOURCES - memorytub.cpp - DEPENDS - fnet -) -vespa_add_test(NAME fnet_memorytub_test_app COMMAND fnet_memorytub_test_app) diff --git a/fnet/src/tests/frt/memorytub/DESC b/fnet/src/tests/frt/memorytub/DESC deleted file mode 100644 index b075fd81de5..00000000000 --- a/fnet/src/tests/frt/memorytub/DESC +++ /dev/null @@ -1 +0,0 @@ -Test the memorytub class. diff --git a/fnet/src/tests/frt/memorytub/FILES b/fnet/src/tests/frt/memorytub/FILES deleted file mode 100644 index 58e3592a833..00000000000 --- a/fnet/src/tests/frt/memorytub/FILES +++ /dev/null @@ -1 +0,0 @@ -memorytub.cpp diff --git a/fnet/src/tests/frt/memorytub/memorytub.cpp b/fnet/src/tests/frt/memorytub/memorytub.cpp deleted file mode 100644 index a6be26bc860..00000000000 --- a/fnet/src/tests/frt/memorytub/memorytub.cpp +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include -#include - -//--------------------------------------------------------------- - -enum { - SMALL_ALLOCS = 90, - BIG_ALLOCS = 10, - ALLOCS = SMALL_ALLOCS + BIG_ALLOCS, - SMALL_SIZE = 407, - BIG_SIZE = 40700, - NOID = 99999 -}; - -//--------------------------------------------------------------- - -struct Fixture { - FRT_MemoryTub _tub; - char *_res[ALLOCS]; - uint32_t _i; - uint32_t _j; - Fixture() : _tub(), _res(), _i(0), _j(0) {} - bool overlap(char *start1, char *end1, - char *start2, char *end2); - bool inTub(char *pt, char *end); - bool notInTub(char *pt, char *end); -}; - -//--------------------------------------------------------------- - -bool -Fixture::overlap(char *start1, char *end1, - char *start2, char *end2) -{ - if (start1 == end1) - return false; - - if (start2 == end2) - return false; - - if (start2 >= start1 && start2 < end1) - return true; - - if (end2 > start1 && end2 <= end1) - return true; - - if (start1 >= start2 && start1 < end2) - return true; - - if (end1 > start2 && end1 <= end2) - return true; - - return false; -} - - -bool -Fixture::inTub(char *pt, char *end) -{ - for (char *p = pt; p < end; p++) - if (!_tub.InTub(p)) - return false; - return true; -} - - -bool -Fixture::notInTub(char *pt, char *end) -{ - for (char *p = pt; p < end; p++) - if (_tub.InTub(p)) - return false; - return true; -} - -//--------------------------------------------------------------- - -TEST_F("memory tub", Fixture()) { - for(f1._i = 0; f1._i < ALLOCS; f1._i++) - f1._res[f1._i] = NULL; - f1._i = NOID; - f1._j = NOID; - - EXPECT_TRUE(!f1._tub.InTub(&f1._tub)); - EXPECT_TRUE((uint32_t)SMALL_SIZE < (uint32_t)FRT_MemoryTub::ALLOC_LIMIT); - EXPECT_TRUE((uint32_t)BIG_SIZE > (uint32_t)FRT_MemoryTub::ALLOC_LIMIT); - EXPECT_TRUE((SMALL_SIZE * SMALL_ALLOCS) - > (FRT_MemoryTub::FIXED_SIZE + FRT_MemoryTub::CHUNK_SIZE)); - TEST_FLUSH(); - - for (f1._i = 0; f1._i < ALLOCS; f1._i++) { - uint32_t size_i = f1._i < SMALL_ALLOCS ? SMALL_SIZE : BIG_SIZE; - - f1._res[f1._i] = (char *) f1._tub.Alloc(size_i); - EXPECT_TRUE(((void *)f1._res[f1._i]) != ((void *)&f1._tub)); - memset(f1._res[f1._i], 0x55, size_i); - EXPECT_TRUE(f1.inTub(f1._res[f1._i], f1._res[f1._i] + size_i)); - } - TEST_FLUSH(); - - for (f1._i = 0; f1._i < ALLOCS; f1._i++) { - uint32_t size_i = f1._i < SMALL_ALLOCS ? SMALL_SIZE : BIG_SIZE; - EXPECT_TRUE(f1.inTub(f1._res[f1._i], f1._res[f1._i] + size_i)); - - for (f1._j = f1._i + 1; f1._j < ALLOCS; f1._j++) { - uint32_t size_j = f1._j < SMALL_ALLOCS ? SMALL_SIZE : BIG_SIZE; - EXPECT_TRUE(!f1.overlap(f1._res[f1._i], f1._res[f1._i] + size_i, - f1._res[f1._j], f1._res[f1._j] + size_j)); - } - } - TEST_FLUSH(); - - f1._tub.Reset(); - f1._j = NOID; - - for (f1._i = 0; f1._i < ALLOCS; f1._i++) { - uint32_t size_i = f1._i < SMALL_ALLOCS ? SMALL_SIZE : BIG_SIZE; - EXPECT_TRUE(!f1.inTub(f1._res[f1._i], f1._res[f1._i] + size_i)); - } - TEST_FLUSH(); -} - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index 7983d2eb9d8..9034041947e 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include +#include //------------------------------------------------------------- @@ -104,8 +105,8 @@ public: class EchoTest : public FRT_Invokable { private: - FRT_MemoryTub *_echo_tub; - FRT_Values *_echo_args; + vespalib::Stash *_echo_tub; + FRT_Values *_echo_args; EchoTest(const EchoTest &); EchoTest &operator=(const EchoTest &); @@ -120,7 +121,7 @@ public: void Init(FRT_Supervisor *supervisor) { - _echo_tub = new FRT_MemoryTub(); + _echo_tub = new vespalib::Stash(); _echo_args = new FRT_Values(_echo_tub); assert(_echo_tub != NULL && _echo_args != NULL); @@ -261,7 +262,7 @@ public: if (delay == 0) { req->Return(); } else { - new (req->GetMemoryTub()) DelayedReturn(_scheduler, + new (req->getStash()) DelayedReturn(_scheduler, req, ((double)delay) / 1000.0); } @@ -281,7 +282,7 @@ public: // block, but don't cripple server scheduler... // (NB: in 'real life', instant methods should never block) - FastOS_Time *now = _supervisor->GetTransport()->GetTimeSampler(); + FastOS_TimeInterface *now = _supervisor->GetTransport()->GetTimeSampler(); FNET_Scheduler *scheduler = _supervisor->GetScheduler(); assert(scheduler->GetTimeSampler() == now); diff --git a/fnet/src/tests/frt/rpc/sharedblob.cpp b/fnet/src/tests/frt/rpc/sharedblob.cpp index bb115f1c65f..a94a322d7cc 100644 --- a/fnet/src/tests/frt/rpc/sharedblob.cpp +++ b/fnet/src/tests/frt/rpc/sharedblob.cpp @@ -1,8 +1,11 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + #include #include #include +constexpr size_t ALLOC_LIMIT=1024; + struct MyBlob : FRT_ISharedBlob { int refcnt; @@ -16,8 +19,8 @@ struct MyBlob : FRT_ISharedBlob struct Data { enum { - SMALL = (FRT_MemoryTub::ALLOC_LIMIT / 2), - LARGE = (FRT_MemoryTub::ALLOC_LIMIT * 2) + SMALL = (ALLOC_LIMIT / 2), + LARGE = (ALLOC_LIMIT * 2) }; char *buf; diff --git a/fnet/src/tests/frt/values/values_test.cpp b/fnet/src/tests/frt/values/values_test.cpp index 01c43d8207d..28220f6d1d2 100644 --- a/fnet/src/tests/frt/values/values_test.cpp +++ b/fnet/src/tests/frt/values/values_test.cpp @@ -1,6 +1,10 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include -#include +#include +#include +#include + +using vespalib::Stash; uint8_t int8_arr[3] = { 1, 2, 3 }; uint16_t int16_arr[3] = { 2, 4, 6 }; @@ -159,12 +163,12 @@ void checkValues(FRT_Values &v1, FRT_Values &v2) { EXPECT_TRUE(v2.Equals(&v1)); } -TEST_FF("set and get", FRT_MemoryTub(), FRT_Values(&f1)) { +TEST_FF("set and get", Stash(), FRT_Values(&f1)) { fillValues(f2); checkValues(f2); } -TEST_FFFF("encode/decode big endian", FRT_MemoryTub(), FRT_Values(&f1), +TEST_FFFF("encode/decode big endian", Stash(), FRT_Values(&f1), FNET_DataBuffer(), FRT_Values(&f1)) { fillValues(f2); @@ -174,7 +178,7 @@ TEST_FFFF("encode/decode big endian", FRT_MemoryTub(), FRT_Values(&f1), checkValues(f2, f4); } -TEST_FFFF("encode/decode host endian", FRT_MemoryTub(), FRT_Values(&f1), +TEST_FFFF("encode/decode host endian", Stash(), FRT_Values(&f1), FNET_DataBuffer(), FRT_Values(&f1)) { fillValues(f2); @@ -184,7 +188,7 @@ TEST_FFFF("encode/decode host endian", FRT_MemoryTub(), FRT_Values(&f1), checkValues(f2, f4); } -TEST_FFFF("decode little if host is little", FRT_MemoryTub(), FRT_Values(&f1), +TEST_FFFF("decode little if host is little", Stash(), FRT_Values(&f1), FNET_DataBuffer(), FRT_Values(&f1)) { if (FNET_Info::GetEndian() == FNET_Info::ENDIAN_LITTLE) { @@ -199,7 +203,7 @@ TEST_FFFF("decode little if host is little", FRT_MemoryTub(), FRT_Values(&f1), } } -TEST_FF("print values", FRT_MemoryTub(), FRT_Values(&f1)) { +TEST_FF("print values", Stash(), FRT_Values(&f1)) { fillValues(f2); f2.Print(); } diff --git a/fnet/src/tests/printstuff/printstuff_test.cpp b/fnet/src/tests/printstuff/printstuff_test.cpp index 3778cef9c8c..d39cee5a44b 100644 --- a/fnet/src/tests/printstuff/printstuff_test.cpp +++ b/fnet/src/tests/printstuff/printstuff_test.cpp @@ -24,14 +24,14 @@ TEST("rpc packets in a queue") { { req->SetMethodName("foo"); FNET_PacketQueue_NoLock q1(1, FNET_IPacketHandler::FNET_KEEP_CHANNEL); - q1.QueuePacket_NoLock(new (req->GetMemoryTub()) FRT_RPCRequestPacket(req, 0, false), FNET_Context()); - q1.QueuePacket_NoLock(new (req->GetMemoryTub()) FRT_RPCReplyPacket(req, 0, false), FNET_Context()); - q1.QueuePacket_NoLock(new (req->GetMemoryTub()) FRT_RPCErrorPacket(req, 0, false), FNET_Context()); + q1.QueuePacket_NoLock(new (req->getStash()) FRT_RPCRequestPacket(req, 0, false), FNET_Context()); + q1.QueuePacket_NoLock(new (req->getStash()) FRT_RPCReplyPacket(req, 0, false), FNET_Context()); + q1.QueuePacket_NoLock(new (req->getStash()) FRT_RPCErrorPacket(req, 0, false), FNET_Context()); q1.Print(); FNET_PacketQueue q2(2, FNET_IPacketHandler::FNET_KEEP_CHANNEL); - q2.QueuePacket(new (req->GetMemoryTub()) FRT_RPCRequestPacket(req, 0, false), FNET_Context()); - q2.QueuePacket(new (req->GetMemoryTub()) FRT_RPCReplyPacket(req, 0, false), FNET_Context()); - q2.QueuePacket(new (req->GetMemoryTub()) FRT_RPCErrorPacket(req, 0, false), FNET_Context()); + q2.QueuePacket(new (req->getStash()) FRT_RPCRequestPacket(req, 0, false), FNET_Context()); + q2.QueuePacket(new (req->getStash()) FRT_RPCReplyPacket(req, 0, false), FNET_Context()); + q2.QueuePacket(new (req->getStash()) FRT_RPCErrorPacket(req, 0, false), FNET_Context()); q2.Print(); } req->SubRef(); diff --git a/fnet/src/tests/regress/frt/memorytub/.gitignore b/fnet/src/tests/regress/frt/memorytub/.gitignore deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/fnet/src/vespa/fnet/CMakeLists.txt b/fnet/src/vespa/fnet/CMakeLists.txt index 556cb69a988..61f07d08b53 100644 --- a/fnet/src/vespa/fnet/CMakeLists.txt +++ b/fnet/src/vespa/fnet/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_library(fnet connect_thread.cpp connection.cpp connector.cpp + context.cpp controlpacket.cpp databuffer.cpp dummypacket.cpp diff --git a/fnet/src/vespa/fnet/channel.h b/fnet/src/vespa/fnet/channel.h index d2c798d0b5e..f9ce9d249a5 100644 --- a/fnet/src/vespa/fnet/channel.h +++ b/fnet/src/vespa/fnet/channel.h @@ -6,8 +6,6 @@ #include "ipackethandler.h" #include -#define FNET_NOID ((uint32_t)-1) - class FNET_Connection; class FNET_IPacketHandler; /** diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index 259702072cf..d5ccbca2617 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -1,12 +1,48 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include +#include "connection.h" +#include "dummypacket.h" +#include "channel.h" +#include "controlpacket.h" +#include "ipacketstreamer.h" +#include "iserveradapter.h" +#include "config.h" +#include "transport_thread.h" +#include + #include LOG_SETUP(".fnet"); -#include + +namespace { +class SyncPacket : public FNET_DummyPacket { +private: + FastOS_Cond _cond; + bool _done; + bool _waiting; + +public: + SyncPacket() + : _cond(), + _done(false), + _waiting(false) {} + + virtual ~SyncPacket() {} + + void WaitFree() { + _cond.Lock(); + _waiting = true; + while (!_done) + _cond.Wait(); + _waiting = false; + _cond.Unlock(); + } + + virtual void Free(); +}; + void -FNET_Connection::SyncPacket::Free() +SyncPacket::Free() { _cond.Lock(); _done = true; @@ -15,6 +51,8 @@ FNET_Connection::SyncPacket::Free() _cond.Unlock(); } +} + /////////////////////// // PROTECTED METHODS // /////////////////////// @@ -36,7 +74,6 @@ FNET_Connection::GetStateString(State state) } } - void FNET_Connection::SetState(State state) { @@ -360,7 +397,7 @@ FNET_Connection::Write(bool direct) FNET_Connection::FNET_Connection(FNET_TransportThread *owner, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter, - FastOS_Socket *mySocket, + FastOS_SocketInterface *mySocket, const char *spec) : FNET_IOComponent(owner, mySocket, spec, /* time-out = */ true), _streamer(streamer), @@ -395,7 +432,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner, FNET_IPacketHandler *adminHandler, FNET_Context adminContext, FNET_Context context, - FastOS_Socket *mySocket, + FastOS_SocketInterface *mySocket, const char *spec) : FNET_IOComponent(owner, mySocket, spec, /* time-out = */ true), _streamer(streamer), diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 16ef66f0cf4..c66e349cb9b 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -3,6 +3,17 @@ #pragma once #include "ext_connectable.h" +#include "iocomponent.h" +#include "databuffer.h" +#include "context.h" +#include "channellookup.h" +#include "packetqueue.h" + +class FNET_IPacketStreamer; +class FNET_IServerAdapter; +class FastOS_SocketInterface; +class FNET_IPacketHandler; + /** * Interface implemented by objects that want to perform connection * cleanup. Use the SetCleanupHandler method to register with a @@ -53,35 +64,6 @@ public: FNET_WRITE_REDO = 10 }; -#ifndef IAM_DOXYGEN - class SyncPacket : public FNET_DummyPacket - { - private: - FNET_Cond _cond; - bool _done; - bool _waiting; - - public: - SyncPacket() - : _cond(), - _done(false), - _waiting(false) {} - virtual ~SyncPacket() {} - - void WaitFree() - { - _cond.Lock(); - _waiting = true; - while(!_done) - _cond.Wait(); - _waiting = false; - _cond.Unlock(); - } - - virtual void Free(); - }; -#endif // DOXYGEN - private: struct Flags { Flags() : @@ -100,7 +82,7 @@ private: FNET_IPacketStreamer *_streamer; // custom packet streamer FNET_IServerAdapter *_serverAdapter; // only on server side FNET_Channel *_adminChannel; // only on client side - FastOS_Socket *_socket; // socket for this conn + FastOS_SocketInterface *_socket; // socket for this conn FNET_Context _context; // connection context State _state; // connection state Flags _flags; // Packed flags. @@ -239,7 +221,7 @@ public: FNET_Connection(FNET_TransportThread *owner, FNET_IPacketStreamer *streamer, FNET_IServerAdapter *serverAdapter, - FastOS_Socket *mySocket, + FastOS_SocketInterface *mySocket, const char *spec); /** @@ -260,7 +242,7 @@ public: FNET_IPacketHandler *adminHandler, FNET_Context adminContext, FNET_Context context, - FastOS_Socket *mySocket, + FastOS_SocketInterface *mySocket, const char *spec); /** diff --git a/fnet/src/vespa/fnet/context.cpp b/fnet/src/vespa/fnet/context.cpp new file mode 100644 index 00000000000..24456466c04 --- /dev/null +++ b/fnet/src/vespa/fnet/context.cpp @@ -0,0 +1,12 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "context.h" +#include + +void +FNET_Context::Print(uint32_t indent) { + printf("%*sFNET_Context {\n", indent, ""); + printf("%*s Value[INT] : %d\n", indent, "", _value.INT); + printf("%*s Value[VOIDP]: %p\n", indent, "", _value.VOIDP); + printf("%*s}\n", indent, ""); +} diff --git a/fnet/src/vespa/fnet/context.h b/fnet/src/vespa/fnet/context.h index 782009e5bec..e06d3edfd6c 100644 --- a/fnet/src/vespa/fnet/context.h +++ b/fnet/src/vespa/fnet/context.h @@ -2,6 +2,10 @@ #pragma once +#include + +#define FNET_NOID ((uint32_t)-1) + class FNET_IOComponent; class FNET_Connector; class FNET_Connection; @@ -15,7 +19,7 @@ class FNET_IExecutable; class FNET_Context { public: - FNET_Context() : _value() { _value.VOIDP = NULL; } + FNET_Context() : _value() { _value.VOIDP = nullptr; } FNET_Context(uint32_t value) : _value() { _value.INT = value; } FNET_Context(void *value) : _value() { _value.VOIDP = value; } FNET_Context(FNET_Channel *value) : _value() { _value.CHANNEL = value; } @@ -39,12 +43,6 @@ public: FNET_IExecutable *EXECUTABLE; } _value; - void Print(uint32_t indent = 0) - { - printf("%*sFNET_Context {\n", indent, ""); - printf("%*s Value[INT] : %d\n", indent, "", _value.INT); - printf("%*s Value[VOIDP]: %p\n", indent, "", _value.VOIDP); - printf("%*s}\n", indent, ""); - } + void Print(uint32_t indent = 0); }; diff --git a/fnet/src/vespa/fnet/dummypacket.h b/fnet/src/vespa/fnet/dummypacket.h index e7db26277bf..87af112901c 100644 --- a/fnet/src/vespa/fnet/dummypacket.h +++ b/fnet/src/vespa/fnet/dummypacket.h @@ -2,6 +2,8 @@ #pragma once +#include "packet.h" + /** * A dummy packet is neither a regular packet nor a control * packet. The idea with this class is that you subclass it to make diff --git a/fnet/src/vespa/fnet/frt/CMakeLists.txt b/fnet/src/vespa/fnet/frt/CMakeLists.txt index 3b2693797c5..178a25f6f38 100644 --- a/fnet/src/vespa/fnet/frt/CMakeLists.txt +++ b/fnet/src/vespa/fnet/frt/CMakeLists.txt @@ -3,7 +3,6 @@ vespa_add_library(fnet_frt OBJECT SOURCES error.cpp invoker.cpp - memorytub.cpp packets.cpp reflection.cpp rpcrequest.cpp diff --git a/fnet/src/vespa/fnet/frt/frt.h b/fnet/src/vespa/fnet/frt/frt.h index 9c8441e4a1d..7a5d2700282 100644 --- a/fnet/src/vespa/fnet/frt/frt.h +++ b/fnet/src/vespa/fnet/frt/frt.h @@ -8,7 +8,6 @@ class FRT_IReturnHandler; class FRT_ICleanupHandler; class FRT_ISharedBlob; -class FRT_MemoryTub; class FRT_Method; class FRT_PacketFactory; class FRT_ReflectionBuilder; @@ -26,7 +25,6 @@ class FRT_Values; #include "error.h" #include "isharedblob.h" #include "invokable.h" -#include "memorytub.h" #include "values.h" #include "reflection.h" #include "rpcrequest.h" diff --git a/fnet/src/vespa/fnet/frt/invokable.h b/fnet/src/vespa/fnet/frt/invokable.h index 31112e5934d..15ac8691c7a 100644 --- a/fnet/src/vespa/fnet/frt/invokable.h +++ b/fnet/src/vespa/fnet/frt/invokable.h @@ -2,6 +2,8 @@ #pragma once +class FRT_RPCRequest; + class FRT_Invokable { public: diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp index 6124bc4ed42..7630c2de13a 100644 --- a/fnet/src/vespa/fnet/frt/invoker.cpp +++ b/fnet/src/vespa/fnet/frt/invoker.cpp @@ -1,9 +1,30 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include +#include "invoker.h" +#include "supervisor.h" +#include + #include LOG_SETUP(".fnet.frt.invoker"); -#include + +FRT_SingleReqWait::FRT_SingleReqWait() + : _cond(), + _done(false), + _waiting(false) +{ } + +FRT_SingleReqWait::~FRT_SingleReqWait() {} + +void +FRT_SingleReqWait::WaitReq() +{ + _cond.Lock(); + _waiting = true; + while(!_done) + _cond.Wait(); + _waiting = false; + _cond.Unlock(); +} void @@ -42,6 +63,21 @@ FRT_RPCInvoker::FRT_RPCInvoker(FRT_Supervisor *supervisor, req->SetReturnHandler(this); } +bool FRT_RPCInvoker::IsInstant() { + return _method->IsInstant(); +} + +bool FRT_RPCInvoker::Invoke(bool freeChannel) +{ + bool detached = false; + _req->SetDetachedPT(&detached); + (_method->GetHandler()->*_method->GetMethod())(_req); + if (detached) + return false; + HandleDone(freeChannel); + return true; +} + void FRT_RPCInvoker::HandleDone(bool freeChannel) { @@ -92,6 +128,15 @@ FRT_RPCInvoker::Run(FastOS_ThreadInterface *, void *) //----------------------------------------------------------------------------- +void FRT_HookInvoker::Invoke() +{ + bool detached = false; + _req->SetDetachedPT(&detached); + (_hook->GetHandler()->*_hook->GetMethod())(_req); + assert(!detached); + _req->SubRef(); +} + void FRT_HookInvoker::HandleReturn() { diff --git a/fnet/src/vespa/fnet/frt/invoker.h b/fnet/src/vespa/fnet/frt/invoker.h index 060397604ee..d5106aca5c8 100644 --- a/fnet/src/vespa/fnet/frt/invoker.h +++ b/fnet/src/vespa/fnet/frt/invoker.h @@ -2,6 +2,14 @@ #pragma once +#include "rpcrequest.h" +#include +#include +#include +#include + +class FRT_Method; +class FRT_Supervisor; //----------------------------------------------------------------------------- class FRT_IRequestWait @@ -21,27 +29,15 @@ public: class FRT_SingleReqWait : public FRT_IRequestWait { private: - FNET_Cond _cond; + FastOS_Cond _cond; bool _done; bool _waiting; public: - FRT_SingleReqWait() - : _cond(), - _done(false), - _waiting(false) {} - virtual ~FRT_SingleReqWait() {} - - void WaitReq() - { - _cond.Lock(); - _waiting = true; - while(!_done) - _cond.Wait(); - _waiting = false; - _cond.Unlock(); - } + FRT_SingleReqWait(); + virtual ~FRT_SingleReqWait(); + void WaitReq(); virtual void RequestDone(FRT_RPCRequest *req); }; @@ -78,23 +74,12 @@ public: bool noReply); void ForceMethod(FRT_Method *method) { _method = method; } - bool IsInstant() { return _method->IsInstant(); } + bool IsInstant(); FRT_RPCRequest *GetRequest() { return _req; } void HandleDone(bool freeChannel); - - bool Invoke(bool freeChannel) - { - bool detached = false; - _req->SetDetachedPT(&detached); - (_method->GetHandler()->*_method->GetMethod())(_req); - if (detached) - return false; - HandleDone(freeChannel); - return true; - } - + bool Invoke(bool freeChannel); virtual void HandleReturn(); virtual FNET_Connection *GetConnection(); virtual void Run(FastOS_ThreadInterface *, void *); @@ -123,15 +108,7 @@ public: _req->SetReturnHandler(this); } - void Invoke() - { - bool detached = false; - _req->SetDetachedPT(&detached); - (_hook->GetHandler()->*_hook->GetMethod())(_req); - assert(!detached); - _req->SubRef(); - } - + void Invoke(); virtual void HandleReturn(); virtual FNET_Connection *GetConnection(); }; diff --git a/fnet/src/vespa/fnet/frt/isharedblob.h b/fnet/src/vespa/fnet/frt/isharedblob.h index f75122b8c48..084081a6816 100644 --- a/fnet/src/vespa/fnet/frt/isharedblob.h +++ b/fnet/src/vespa/fnet/frt/isharedblob.h @@ -2,6 +2,7 @@ #pragma once +#include class FRT_ISharedBlob { diff --git a/fnet/src/vespa/fnet/frt/memorytub.cpp b/fnet/src/vespa/fnet/frt/memorytub.cpp deleted file mode 100644 index 736c6572608..00000000000 --- a/fnet/src/vespa/fnet/frt/memorytub.cpp +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include -#include - -void * -FRT_MemoryTub::SlowAlloc(size_t size) -{ - void *chunkMem = malloc(sizeof(Chunk) + CHUNK_SIZE); - assert(chunkMem != NULL); - _chunkHead = new (chunkMem) Chunk(CHUNK_SIZE, 0, _chunkHead); - return _chunkHead->Alloc(size); -} - - -void * -FRT_MemoryTub::BigAlloc(size_t size) -{ - void *ret = malloc(size); - assert(ret != NULL); - _allocHead = new (this) AllocInfo(_allocHead, size, ret); - return ret; -} - - -bool -FRT_MemoryTub::InTub(const void *pt) const -{ - const char *p = (const char *) pt; - - for (Chunk *chunk = _chunkHead; chunk != NULL; chunk = chunk->_next) - if (p >= chunk->_data && - p < chunk->_data + chunk->_used) - return true; - - for (AllocInfo *info = _allocHead; info != NULL; info = info->_next) - if (p >= (char *) info->_data && - p < (char *) info->_data + info->_size) - return true; - - return false; -} diff --git a/fnet/src/vespa/fnet/frt/memorytub.h b/fnet/src/vespa/fnet/frt/memorytub.h deleted file mode 100644 index abed57c75a6..00000000000 --- a/fnet/src/vespa/fnet/frt/memorytub.h +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include -#include - - -class FRT_MemoryTub -{ - -public: - - enum { - CHUNK_SIZE = 32500, - FIXED_SIZE = 3880, - ALLOC_LIMIT = 3200 - }; - - struct AllocInfo { - AllocInfo *_next; - uint32_t _size; - void *_data; - - AllocInfo(AllocInfo *next, uint32_t size, void *data) - : _next(next), _size(size), _data(data) {} - - private: - AllocInfo(const AllocInfo &); - AllocInfo &operator=(const AllocInfo &); - }; - - struct Chunk { - uint32_t _size; - uint32_t _used; - Chunk *_next; - char *_data; - - Chunk(uint32_t size, uint32_t used, Chunk *next) - : _size(size), _used(used), _next(next), _data(NULL) - { - _data = reinterpret_cast(this + 1); - } - - void *Alloc(size_t size) - { - size_t alignedsize = ((size + (sizeof(char *) - 1)) - & ~(sizeof(char *) - 1)); - if (_used + alignedsize <= _size) { - void *ret = _data + _used; - _used += alignedsize; - return ret; - } - return NULL; - } - - private: - Chunk(const Chunk &); - Chunk &operator=(const Chunk &); - }; - -private: - - Chunk _fixedChunk; - char _fixedData[FIXED_SIZE]; - Chunk *_chunkHead; - AllocInfo *_allocHead; - - FRT_MemoryTub(const FRT_MemoryTub &); - FRT_MemoryTub &operator=(const FRT_MemoryTub &); - - void *SlowAlloc(size_t size); - void *BigAlloc(size_t size); - -public: - - FRT_MemoryTub() - : _fixedChunk(FIXED_SIZE, 0, NULL), - _chunkHead(&_fixedChunk), - _allocHead(NULL) - { - // Just to be sure - _fixedChunk._data = _fixedData; - } - - bool InTub(const void *pt) const; - - void *Alloc(size_t size) - { - if (size > ALLOC_LIMIT) - return BigAlloc(size); - void *tmp = _chunkHead->Alloc(size); - return (tmp != NULL) ? tmp : SlowAlloc(size); - }; - - char *CopyString(const char *str, uint32_t len) - { - char *pt = (char *) Alloc(len + 1); - memcpy(pt, str, len); - pt[len] = '\0'; - return pt; - } - - char *CopyData(const char *buf, uint32_t len) - { - char *pt = (char *) Alloc(len); - memcpy(pt, buf, len); - return pt; - } - - void Reset() - { - for (AllocInfo *info = _allocHead; - info != NULL; info = info->_next) { - free(info->_data); - } - _allocHead = NULL; - while (_chunkHead != &_fixedChunk) { - Chunk *tmp = _chunkHead; - _chunkHead = tmp->_next; - free(tmp); - } - _fixedChunk._used = 0; - } - - ~FRT_MemoryTub() - { - Reset(); - } -}; - - -inline void * -operator new(size_t size, FRT_MemoryTub *tub) -{ - return tub->Alloc(size); -} - - -inline void * -operator new[](size_t size, FRT_MemoryTub *tub) -{ - (void) size; - (void) tub; - fprintf(stderr, "Microsoft does not permit this operation!"); - abort(); - return malloc(size); -} - diff --git a/fnet/src/vespa/fnet/frt/packets.cpp b/fnet/src/vespa/fnet/frt/packets.cpp index 964293fae4f..93f2030669d 100644 --- a/fnet/src/vespa/fnet/frt/packets.cpp +++ b/fnet/src/vespa/fnet/frt/packets.cpp @@ -253,7 +253,7 @@ FRT_PacketFactory::CreatePacket(uint32_t pcode, FNET_Context context) if (req == NULL || (flags & ~FLAG_FRT_RPC_SUPPORTED_MASK) != 0) return NULL; - FRT_MemoryTub *tub = req->GetMemoryTub(); + vespalib::Stash *tub = req->getStash(); pcode &= 0xffff; // remove flags switch(pcode) { diff --git a/fnet/src/vespa/fnet/frt/reflection.cpp b/fnet/src/vespa/fnet/frt/reflection.cpp index 35e74200709..8bfa4507cd2 100644 --- a/fnet/src/vespa/fnet/frt/reflection.cpp +++ b/fnet/src/vespa/fnet/frt/reflection.cpp @@ -1,18 +1,61 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include -#include +#include "reflection.h" +#include "values.h" +#include "rpcrequest.h" +#include "supervisor.h" + +FRT_Method::FRT_Method(const char * name, const char * paramSpec, const char * returnSpec, + bool instant, FRT_METHOD_PT method, FRT_Invokable * handler) + : _hashNext(nullptr), + _listNext(nullptr), + _name(strdup(name)), + _paramSpec(strdup(paramSpec)), + _returnSpec(strdup(returnSpec)), + _instant(instant), + _method(method), + _handler(handler), + _docLen(0), + _doc(nullptr) +{ + assert(_name != nullptr); + assert(_paramSpec != nullptr); + assert(_returnSpec != nullptr); +} + +FRT_Method::~FRT_Method() { + free(_name); + free(_paramSpec); + free(_returnSpec); + free(_doc); +} + +void +FRT_Method::SetDocumentation(FRT_Values *values) { + free(_doc); + _docLen = values->GetLength(); + _doc = (char *) malloc(_docLen); + assert(_doc != nullptr); + + FNET_DataBuffer buf(_doc, _docLen); + values->EncodeCopy(&buf); +} +void +FRT_Method::GetDocumentation(FRT_Values *values) { + FNET_DataBuffer buf(_doc, _docLen); + buf.FreeToData(_docLen); + values->DecodeCopy(&buf, _docLen); +} FRT_ReflectionManager::FRT_ReflectionManager() : _numMethods(0), - _methods(NULL), + _methods(nullptr), _methodHash() { Reset(); } - FRT_ReflectionManager::~FRT_ReflectionManager() { Reset(); @@ -23,14 +66,14 @@ void FRT_ReflectionManager::Reset() { _numMethods = 0; - while (_methods != NULL) { + while (_methods != nullptr) { FRT_Method *method = _methods; _methods = method->GetNext(); delete(method); } for (uint32_t i = 0; i < METHOD_HASH_SIZE; i++) - _methodHash[i] = NULL; + _methodHash[i] = nullptr; } @@ -49,12 +92,12 @@ FRT_ReflectionManager::AddMethod(FRT_Method *method) FRT_Method * FRT_ReflectionManager::LookupMethod(const char *name) { - if (name == NULL) { - return NULL; + if (name == nullptr) { + return nullptr; } uint32_t hash = HashStr(name, METHOD_HASH_SIZE); FRT_Method *ret = _methodHash[hash]; - while (ret != NULL && strcmp(name, ret->GetName()) != 0) + while (ret != nullptr && strcmp(name, ret->GetName()) != 0) ret = ret->_hashNext; return ret; } @@ -67,7 +110,7 @@ FRT_ReflectionManager::DumpMethodList(FRT_Values *target) FRT_StringValue *args = target->AddStringArray(_numMethods); FRT_StringValue *ret = target->AddStringArray(_numMethods); uint32_t idx = 0; - for (FRT_Method *method = _methods; method != NULL; + for (FRT_Method *method = _methods; method != nullptr; method = method->GetNext(), idx++) { target->SetString(&names[idx], method->GetName()); target->SetString(&args[idx], method->GetParamSpec()); @@ -81,7 +124,7 @@ FRT_ReflectionManager::DumpMethodList(FRT_Values *target) void FRT_ReflectionBuilder::Flush() { - if (_method == NULL) + if (_method == nullptr) return; for (; _curArg < _argCnt; _curArg++) { @@ -94,7 +137,7 @@ FRT_ReflectionBuilder::Flush() } _method->SetDocumentation(_values); - _method = NULL; + _method = nullptr; _req->Reset(); } @@ -102,17 +145,17 @@ FRT_ReflectionBuilder::Flush() FRT_ReflectionBuilder::FRT_ReflectionBuilder(FRT_Supervisor *supervisor) : _supervisor(supervisor), _lookup(supervisor->GetReflectionManager()), - _method(NULL), + _method(nullptr), _req(supervisor->AllocRPCRequest()), _values(_req->GetReturn()), _argCnt(0), _retCnt(0), _curArg(0), _curRet(0), - _arg_name(NULL), - _arg_desc(NULL), - _ret_name(NULL), - _ret_desc(NULL) + _arg_name(nullptr), + _arg_desc(nullptr), + _ret_name(nullptr), + _ret_desc(nullptr) { } @@ -132,7 +175,7 @@ FRT_ReflectionBuilder::DefineMethod(const char *name, FRT_METHOD_PT method, FRT_Invokable *handler) { - if (handler == NULL) + if (handler == nullptr) return; Flush(); @@ -161,7 +204,7 @@ FRT_ReflectionBuilder::DefineMethod(const char *name, void FRT_ReflectionBuilder::MethodDesc(const char *desc) { - if (_method == NULL) + if (_method == nullptr) return; _values->SetString(&_values->GetValue(0)._string, desc); @@ -171,7 +214,7 @@ FRT_ReflectionBuilder::MethodDesc(const char *desc) void FRT_ReflectionBuilder::ParamDesc(const char *name, const char *desc) { - if (_method == NULL) + if (_method == nullptr) return; if (_curArg >= _argCnt) @@ -186,7 +229,7 @@ FRT_ReflectionBuilder::ParamDesc(const char *name, const char *desc) void FRT_ReflectionBuilder::ReturnDesc(const char *name, const char *desc) { - if (_method == NULL) + if (_method == nullptr) return; if (_curRet >= _retCnt) diff --git a/fnet/src/vespa/fnet/frt/reflection.h b/fnet/src/vespa/fnet/frt/reflection.h index 4f06d4beb45..fc64b6a69e7 100644 --- a/fnet/src/vespa/fnet/frt/reflection.h +++ b/fnet/src/vespa/fnet/frt/reflection.h @@ -2,6 +2,12 @@ #pragma once +#include "invokable.h" +#include + +class FRT_Values; +class FRT_Supervisor; +class FRT_StringValue; class FRT_Method { @@ -28,30 +34,9 @@ public: const char *returnSpec, bool instant, FRT_METHOD_PT method, - FRT_Invokable *handler) - : _hashNext(NULL), - _listNext(NULL), - _name(strdup(name)), - _paramSpec(strdup(paramSpec)), - _returnSpec(strdup(returnSpec)), - _instant(instant), - _method(method), - _handler(handler), - _docLen(0), - _doc(NULL) - { - assert(_name != NULL); - assert(_paramSpec != NULL); - assert(_returnSpec != NULL); - } + FRT_Invokable *handler); - ~FRT_Method() - { - free(_name); - free(_paramSpec); - free(_returnSpec); - free(_doc); - } + ~FRT_Method(); FRT_Method *GetNext() { return _listNext; } const char *GetName() { return _name; } @@ -60,22 +45,8 @@ public: bool IsInstant() { return _instant; } FRT_METHOD_PT GetMethod() { return _method; } FRT_Invokable *GetHandler() { return _handler; } - void SetDocumentation(FRT_Values *values) - { - free(_doc); - _docLen = values->GetLength(); - _doc = (char *) malloc(_docLen); - assert(_doc != NULL); - - FNET_DataBuffer buf(_doc, _docLen); - values->EncodeCopy(&buf); - } - void GetDocumentation(FRT_Values *values) - { - FNET_DataBuffer buf(_doc, _docLen); - buf.FreeToData(_docLen); - values->DecodeCopy(&buf, _docLen); - } + void SetDocumentation(FRT_Values *values); + void GetDocumentation(FRT_Values *values); }; //------------------------------------------------------------------------ diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.cpp b/fnet/src/vespa/fnet/frt/rpcrequest.cpp index 390cd6fed9c..84a8fb48602 100644 --- a/fnet/src/vespa/fnet/frt/rpcrequest.cpp +++ b/fnet/src/vespa/fnet/frt/rpcrequest.cpp @@ -21,9 +21,7 @@ FRT_RPCRequest::FRT_RPCRequest() _abortHandler(NULL), _returnHandler(NULL), _cleanupHandler(NULL) -{ -} - +{ } FRT_RPCRequest::~FRT_RPCRequest() { @@ -35,8 +33,7 @@ FRT_RPCRequest::SetError(uint32_t errorCode, const char *errorMessage, uint32_t { _errorCode = errorCode; _errorMessageLen = errorMessageLen; - _errorMessage = _tub.CopyString(errorMessage, - errorMessageLen); + _errorMessage = fnet::copyString(_tub.alloc(errorMessageLen + 1), errorMessage, errorMessageLen); } void FRT_RPCRequest::SetError(uint32_t errorCode, const char *errorMessage) { @@ -44,20 +41,68 @@ FRT_RPCRequest::SetError(uint32_t errorCode, const char *errorMessage) { } void -FRT_RPCRequest::SetError(uint32_t errorCode) -{ +FRT_RPCRequest::SetError(uint32_t errorCode) { SetError(errorCode, FRT_GetDefaultErrorMessage(errorCode)); } +bool +FRT_RPCRequest::CheckReturnTypes(const char *types) { + if (IsError()) { + return false; + } + if (strcmp(types, GetReturnSpec()) != 0) { + SetError(FRTE_RPC_WRONG_RETURN); + return false; + } + return true; +} + void -FRT_RPCRequest::Reset() -{ +FRT_RPCRequest::SetMethodName(const char *methodName, uint32_t len) { + _methodNameLen = len; + _methodName = fnet::copyString(_tub.alloc(len + 1), methodName, len); +} +void +FRT_RPCRequest::SetMethodName(const char *methodName) { + SetMethodName(methodName, strlen(methodName)); +} + +bool +FRT_RPCRequest::Abort() { + if (_abortHandler == NULL) { + return false; + } + return _abortHandler->HandleAbort(); +} + +void +FRT_RPCRequest::Return() { + _returnHandler->HandleReturn(); +} + +FNET_Connection * +FRT_RPCRequest::GetConnection() { + if (_returnHandler == NULL) + return NULL; + return _returnHandler->GetConnection(); +} + +void +FRT_RPCRequest::Cleanup() { + if (_cleanupHandler != NULL) { + _cleanupHandler->HandleCleanup(); + _cleanupHandler = NULL; + } +} + +void +FRT_RPCRequest::Reset() { assert(_refcnt <= 1); Cleanup(); _context = FNET_Context(); _params.Reset(); _return.Reset(); - _tub.Reset(); + _tub.clear(); _errorCode = FRTE_NO_ERROR; _errorMessageLen = 0; _errorMessage = NULL; diff --git a/fnet/src/vespa/fnet/frt/rpcrequest.h b/fnet/src/vespa/fnet/frt/rpcrequest.h index 2d7ee0d8c7b..5846322e901 100644 --- a/fnet/src/vespa/fnet/frt/rpcrequest.h +++ b/fnet/src/vespa/fnet/frt/rpcrequest.h @@ -54,7 +54,8 @@ public: class FRT_RPCRequest { private: - FRT_MemoryTub _tub; + using Stash = vespalib::Stash; + Stash _tub; FNET_Context _context; FRT_Values _params; FRT_Values _return; @@ -94,7 +95,7 @@ public: void SetContext(FNET_Context context) { _context = context; } FNET_Context GetContext() { return _context; } - FRT_MemoryTub *GetMemoryTub() { return &_tub; } + Stash * getStash() { return &_tub; } FRT_Values *GetParams() { return &_params; } FRT_Values *GetReturn() { return &_return; } @@ -121,24 +122,10 @@ public: uint32_t GetErrorMessageLen() { return _errorMessageLen; } const char *GetErrorMessage() { return _errorMessage; } - bool CheckReturnTypes(const char *types) { - if (IsError()) { - return false; - } - if (strcmp(types, GetReturnSpec()) != 0) { - SetError(FRTE_RPC_WRONG_RETURN); - return false; - } - return true; - } + bool CheckReturnTypes(const char *types); - void SetMethodName(const char *methodName, uint32_t len) - { - _methodNameLen = len; - _methodName = _tub.CopyString(methodName, len); - } - void SetMethodName(const char *methodName) - { SetMethodName(methodName, strlen(methodName)); } + void SetMethodName(const char *methodName, uint32_t len); + void SetMethodName(const char *methodName); uint32_t GetMethodNameLen() { return _methodNameLen; } const char *GetMethodName() { return _methodName; } @@ -151,38 +138,12 @@ public: void SetDetachedPT(bool *detachedPT) { _detachedPT = detachedPT; } void Detach() { *_detachedPT = true; } - void SetAbortHandler(FRT_IAbortHandler *handler) - { _abortHandler = handler; } - void SetReturnHandler(FRT_IReturnHandler *handler) - { _returnHandler = handler; } - void SetCleanupHandler(FRT_ICleanupHandler *handler) - { _cleanupHandler = handler; } - - bool Abort() - { - if (_abortHandler == NULL) { - return false; - } - return _abortHandler->HandleAbort(); - } - - void Return() { - _returnHandler->HandleReturn(); - } - - FNET_Connection *GetConnection() - { - if (_returnHandler == NULL) - return NULL; - return _returnHandler->GetConnection(); - } + void SetAbortHandler(FRT_IAbortHandler *handler) { _abortHandler = handler; } + void SetReturnHandler(FRT_IReturnHandler *handler) { _returnHandler = handler; } + void SetCleanupHandler(FRT_ICleanupHandler *handler) { _cleanupHandler = handler; } - void Cleanup() - { - if (_cleanupHandler != NULL) { - _cleanupHandler->HandleCleanup(); - _cleanupHandler = NULL; - } - } + bool Abort(); + void Return(); + FNET_Connection *GetConnection(); + void Cleanup(); }; - diff --git a/fnet/src/vespa/fnet/frt/supervisor.cpp b/fnet/src/vespa/fnet/frt/supervisor.cpp index 7be527a9e64..9add08fe1d9 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.cpp +++ b/fnet/src/vespa/fnet/frt/supervisor.cpp @@ -1,8 +1,13 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include -#include - +#include "supervisor.h" +#include "invoker.h" +#include "target.h" +#include +#include +#include +#include +#include FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport, FastOS_ThreadPool *threadPool) @@ -11,11 +16,11 @@ FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport, _standAlone(false), _packetFactory(), _packetStreamer(&_packetFactory), - _connector(NULL), + _connector(nullptr), _reflectionManager(), _rpcHooks(&_reflectionManager), _connHooks(*this), - _methodMismatchHook(NULL) + _methodMismatchHook(nullptr) { _rpcHooks.InitRPC(this); } @@ -23,22 +28,22 @@ FRT_Supervisor::FRT_Supervisor(FNET_Transport *transport, FRT_Supervisor::FRT_Supervisor(uint32_t threadStackSize, uint32_t maxThreads) - : _transport(NULL), - _threadPool(NULL), + : _transport(nullptr), + _threadPool(nullptr), _standAlone(true), _packetFactory(), _packetStreamer(&_packetFactory), - _connector(NULL), + _connector(nullptr), _reflectionManager(), _rpcHooks(&_reflectionManager), _connHooks(*this), - _methodMismatchHook(NULL) + _methodMismatchHook(nullptr) { _transport = new FNET_Transport(); - assert(_transport != NULL); + assert(_transport != nullptr); if (threadStackSize > 0) { _threadPool = new FastOS_ThreadPool(threadStackSize, maxThreads); - assert(_threadPool != NULL); + assert(_threadPool != nullptr); } _rpcHooks.InitRPC(this); } @@ -50,20 +55,22 @@ FRT_Supervisor::~FRT_Supervisor() delete _transport; delete _threadPool; } - if (_connector != NULL) { + if (_connector != nullptr) { _connector->SubRef(); } delete _methodMismatchHook; } +FNET_Scheduler * +FRT_Supervisor::GetScheduler() { return _transport->GetScheduler(); } bool FRT_Supervisor::Listen(const char *spec) { - if (_connector != NULL) + if (_connector != nullptr) return false; _connector = _transport->Listen(spec, &_packetStreamer, this); - return (_connector != NULL); + return (_connector != nullptr); } @@ -79,7 +86,7 @@ FRT_Supervisor::Listen(int port) uint32_t FRT_Supervisor::GetListenPort() const { - return (_connector != NULL) ? _connector->GetPortNumber() : 0; + return (_connector != nullptr) ? _connector->GetPortNumber() : 0; } @@ -88,8 +95,8 @@ FRT_Supervisor::RunInvocation(FRT_RPCInvoker *invoker) { // XXX: implement queue with max length + max # threads - if (_threadPool == NULL || - _threadPool->NewThread(invoker) == NULL) + if (_threadPool == nullptr || + _threadPool->NewThread(invoker) == nullptr) { invoker->GetRequest()->SetError(FRTE_RPC_OVERLOAD, "Could not start thread"); @@ -114,7 +121,7 @@ FRT_Supervisor::Get2WayTarget(const char *spec, FNET_Context connContext) FNET_TransportThread *thread = _transport->select_thread(spec, strlen(spec)); return new FRT_Target(thread->GetScheduler(), thread->Connect(spec, &_packetStreamer, - NULL, FNET_Context(), + nullptr, FNET_Context(), this, connContext)); } @@ -131,7 +138,7 @@ FRT_Supervisor::GetTarget(int port) FRT_RPCRequest * FRT_Supervisor::AllocRPCRequest(FRT_RPCRequest *tradein) { - if (tradein != NULL) { + if (tradein != nullptr) { if (tradein->Recycle()) { return tradein; } @@ -172,7 +179,7 @@ FRT_Supervisor::SetMethodMismatchHook(FRT_METHOD_PT method, delete _methodMismatchHook; _methodMismatchHook = new FRT_Method("frt.hook.methodMismatch", "*", "*", true, method, handler); - assert(_methodMismatchHook != NULL); + assert(_methodMismatchHook != nullptr); } @@ -180,7 +187,7 @@ void FRT_Supervisor::InvokeVoid(FNET_Connection *conn, FRT_RPCRequest *req) { - if (conn != NULL) { + if (conn != nullptr) { FNET_Channel *ch = conn->OpenChannel(); ch->Send(req->CreateRequestPacket(false)); ch->Free(); @@ -199,11 +206,11 @@ FRT_Supervisor::InvokeAsync(SchedulerPtr scheduler, { uint32_t chid; FNET_Packet *packet = req->CreateRequestPacket(true); - FRT_RPCAdapter *adapter = new (req->GetMemoryTub()) FRT_RPCAdapter(scheduler.ptr, req, waiter); - FNET_Channel *ch = (conn == NULL)? NULL : conn->OpenChannel(adapter, FNET_Context((void *)req), &chid); + FRT_RPCAdapter *adapter = new (req->getStash()) FRT_RPCAdapter(scheduler.ptr, req, waiter); + FNET_Channel *ch = (conn == nullptr)? nullptr : conn->OpenChannel(adapter, FNET_Context((void *)req), &chid); adapter->SetChannel(ch); - if (ch == NULL) { + if (ch == nullptr) { packet->Free(); req->SetError(FRTE_RPC_CONNECTION); adapter->ScheduleNow(); @@ -245,7 +252,7 @@ FRT_Supervisor::InitChannel(FNET_Channel *channel, uint32_t pcode) FRT_RPCRequest *req = AllocRPCRequest(); channel->SetHandler(this); channel->SetContext((void *)req); - if (req != NULL) { + if (req != nullptr) { req->SetContext(FNET_Context(channel)); rc = true; } @@ -259,7 +266,7 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) { uint32_t pcode = packet->GetPCODE() & 0xffff; // remove flags FRT_RPCRequest *req = (FRT_RPCRequest *) context._value.VOIDP; - FRT_RPCInvoker *invoker = NULL; + FRT_RPCInvoker *invoker = nullptr; bool noReply = false; if (pcode == PCODE_FRT_RPC_REQUEST) { @@ -267,13 +274,13 @@ FRT_Supervisor::HandlePacket(FNET_Packet *packet, FNET_Context context) } else { req->SetError(FRTE_RPC_BAD_REQUEST); } - invoker = new (req->GetMemoryTub()) FRT_RPCInvoker(this, req, noReply); + invoker = new (req->getStash()) FRT_RPCInvoker(this, req, noReply); packet->Free(); if (req->IsError()) { if (req->GetErrorCode() != FRTE_RPC_BAD_REQUEST - && _methodMismatchHook != NULL) + && _methodMismatchHook != nullptr) { invoker->ForceMethod(_methodMismatchHook); return (invoker->Invoke(false)) ? @@ -303,7 +310,7 @@ bool FRT_Supervisor::Start() { assert(_standAlone); - if (_threadPool == NULL) + if (_threadPool == nullptr) return false; return _transport->Start(_threadPool); } @@ -406,7 +413,7 @@ FRT_Supervisor::RPCHooks::RPC_GetMethodInfo(FRT_RPCRequest *req) FRT_Values &arg = *req->GetParams(); FRT_Method *info = _reflectionManager->LookupMethod(arg[0]._string._str); - if (info != NULL) { + if (info != nullptr) { info->GetDocumentation(req->GetReturn()); } else { req->SetError(FRTE_RPC_METHOD_FAILED, "No such method"); @@ -419,9 +426,9 @@ FRT_Supervisor::RPCHooks::RPC_GetMethodInfo(FRT_RPCRequest *req) FRT_Supervisor::ConnHooks::ConnHooks(FRT_Supervisor &parent) : _parent(parent), - _sessionInitHook(NULL), - _sessionDownHook(NULL), - _sessionFiniHook(NULL) + _sessionInitHook(nullptr), + _sessionDownHook(nullptr), + _sessionFiniHook(nullptr) { } @@ -441,7 +448,7 @@ FRT_Supervisor::ConnHooks::SetSessionInitHook(FRT_METHOD_PT method, delete _sessionInitHook; _sessionInitHook = new FRT_Method("frt.hook.sessionInit", "", "", true, method, handler); - assert(_sessionInitHook != NULL); + assert(_sessionInitHook != nullptr); } @@ -452,7 +459,7 @@ FRT_Supervisor::ConnHooks::SetSessionDownHook(FRT_METHOD_PT method, delete _sessionDownHook; _sessionDownHook = new FRT_Method("frt.hook.sessionDown", "", "", true, method, handler); - assert(_sessionDownHook != NULL); + assert(_sessionDownHook != nullptr); } @@ -463,7 +470,7 @@ FRT_Supervisor::ConnHooks::SetSessionFiniHook(FRT_METHOD_PT method, delete _sessionFiniHook; _sessionFiniHook = new FRT_Method("frt.hook.sessionFini", "", "", true, method, handler); - assert(_sessionFiniHook != NULL); + assert(_sessionFiniHook != nullptr); } @@ -473,7 +480,7 @@ FRT_Supervisor::ConnHooks::InvokeHook(FRT_Method *hook, { FRT_RPCRequest *req = _parent.AllocRPCRequest(); req->SetMethodName(hook->GetName()); - (new (req->GetMemoryTub()) FRT_HookInvoker(req, hook, conn))->Invoke(); + (new (req->getStash()) FRT_HookInvoker(req, hook, conn))->Invoke(); } @@ -482,7 +489,7 @@ FRT_Supervisor::ConnHooks::InitAdminChannel(FNET_Channel *channel) { FNET_Connection *conn = channel->GetConnection(); conn->SetCleanupHandler(this); - if (_sessionInitHook != NULL) { + if (_sessionInitHook != nullptr) { InvokeHook(_sessionInitHook, conn); } channel->SetHandler(this); @@ -500,7 +507,7 @@ FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, return FNET_KEEP_CHANNEL; } FNET_Channel *ch = context._value.CHANNEL; - if (_sessionDownHook != NULL) { + if (_sessionDownHook != nullptr) { InvokeHook(_sessionDownHook, ch->GetConnection()); } return FNET_FREE_CHANNEL; @@ -510,7 +517,15 @@ FRT_Supervisor::ConnHooks::HandlePacket(FNET_Packet *packet, void FRT_Supervisor::ConnHooks::Cleanup(FNET_Connection *conn) { - if (_sessionFiniHook != NULL) { + if (_sessionFiniHook != nullptr) { InvokeHook(_sessionFiniHook, conn); } } + +FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_Transport *transport) + : ptr(transport->GetScheduler()) +{ } + +FRT_Supervisor::SchedulerPtr::SchedulerPtr(FNET_TransportThread *transport_thread) + : ptr(transport_thread->GetScheduler()) +{ } diff --git a/fnet/src/vespa/fnet/frt/supervisor.h b/fnet/src/vespa/fnet/frt/supervisor.h index dbf7bd20bce..56c0abe6d5b 100644 --- a/fnet/src/vespa/fnet/frt/supervisor.h +++ b/fnet/src/vespa/fnet/frt/supervisor.h @@ -2,6 +2,20 @@ #pragma once +#include "invokable.h" +#include "packets.h" +#include "reflection.h" +#include +#include +#include +#include + +class FNET_Transport; +class FRT_Target; +class FastOS_ThreadPool; +class FNET_Scheduler; +class FRT_RPCInvoker; +class FRT_IRequestWait; class FRT_Supervisor : public FNET_IServerAdapter, public FNET_IPacketHandler @@ -76,7 +90,7 @@ public: bool StandAlone() { return _standAlone; } FNET_Transport *GetTransport() { return _transport; } - FNET_Scheduler *GetScheduler() { return _transport->GetScheduler(); } + FNET_Scheduler *GetScheduler(); FastOS_ThreadPool *GetThreadPool() { return _threadPool; } FRT_ReflectionManager *GetReflectionManager() { return &_reflectionManager; } @@ -102,10 +116,8 @@ public: FNET_Scheduler *ptr; SchedulerPtr(FNET_Scheduler *scheduler) : ptr(scheduler) {} - SchedulerPtr(FNET_Transport *transport) - : ptr(transport->GetScheduler()) {} - SchedulerPtr(FNET_TransportThread *transport_thread) - : ptr(transport_thread->GetScheduler()) {} + SchedulerPtr(FNET_Transport *transport); + SchedulerPtr(FNET_TransportThread *transport_thread); }; // methods for performing rpc invocations diff --git a/fnet/src/vespa/fnet/frt/values.cpp b/fnet/src/vespa/fnet/frt/values.cpp index d97dfd17dac..bb3ecd4db2d 100644 --- a/fnet/src/vespa/fnet/frt/values.cpp +++ b/fnet/src/vespa/fnet/frt/values.cpp @@ -9,7 +9,24 @@ static_assert(sizeof(uint8_t) == 1, "uint8_t must be 1 byte."); static_assert(sizeof(float) == sizeof(uint32_t), "float must be same size as uint32_t"); static_assert(sizeof(double) == sizeof(uint64_t), "double must be same size as uint64_t"); -FRT_Values::FRT_Values(FRT_MemoryTub *tub) +constexpr size_t SHARED_LIMIT = 1024; + +namespace fnet { + +char * copyString(char *dst, const char *src, size_t len) { + memcpy(dst, src, len); + dst[len] = '\0'; + return dst; +} + +char * copyData(char *dst, const void *src, size_t len) { + memcpy(dst, src, len); + return dst; +} + +} + +FRT_Values::FRT_Values(Stash *tub) : _maxValues(0), _numValues(0), _typeString(NULL), @@ -20,6 +37,15 @@ FRT_Values::FRT_Values(FRT_MemoryTub *tub) FRT_Values::~FRT_Values() { } +FRT_Values::LocalBlob::LocalBlob(const char *data, uint32_t len) : + _data(Alloc::alloc(len)), + _len(len) +{ + if (data != NULL) { + memcpy(_data.get(), data, len); + } +} + void FRT_Values::DiscardBlobs() { @@ -54,10 +80,10 @@ FRT_Values::EnsureFree(uint32_t need) if (cnt < 16) cnt = 16; - char *types = (char *) _tub->Alloc(cnt + 1); + char *types = (char *) _tub->alloc(cnt + 1); memcpy(types, _typeString, _numValues); memset(types + _numValues, FRT_VALUE_NONE, cnt + 1 - _numValues); - FRT_Value *values = (FRT_Value *) _tub->Alloc(cnt * sizeof(FRT_Value)); + FRT_Value *values = (FRT_Value *) _tub->alloc(cnt * sizeof(FRT_Value)); memcpy(values, _values, _numValues * sizeof(FRT_Value)); _maxValues = cnt; _typeString = types; @@ -67,7 +93,7 @@ FRT_Values::EnsureFree(uint32_t need) uint8_t * FRT_Values::AddInt8Array(uint32_t len) { EnsureFree(); - uint8_t *ret = (uint8_t *) _tub->Alloc(len * sizeof(uint8_t)); + uint8_t *ret = (uint8_t *) _tub->alloc(len * sizeof(uint8_t)); _values[_numValues]._int8_array._pt = ret; _values[_numValues]._int8_array._len = len; _typeString[_numValues++] = FRT_VALUE_INT8_ARRAY; @@ -77,7 +103,7 @@ FRT_Values::AddInt8Array(uint32_t len) { void FRT_Values::AddInt8Array(const uint8_t *array, uint32_t len) { EnsureFree(); - uint8_t *pt = (uint8_t *) _tub->Alloc(len * sizeof(uint8_t)); + uint8_t *pt = (uint8_t *) _tub->alloc(len * sizeof(uint8_t)); _values[_numValues]._int8_array._pt = pt; _values[_numValues]._int8_array._len = len; _typeString[_numValues++] = FRT_VALUE_INT8_ARRAY; @@ -87,7 +113,7 @@ FRT_Values::AddInt8Array(const uint8_t *array, uint32_t len) { uint16_t * FRT_Values::AddInt16Array(uint32_t len) { EnsureFree(); - uint16_t *ret = (uint16_t *) _tub->Alloc(len * sizeof(uint16_t)); + uint16_t *ret = (uint16_t *) _tub->alloc(len * sizeof(uint16_t)); _values[_numValues]._int16_array._pt = ret; _values[_numValues]._int16_array._len = len; _typeString[_numValues++] = FRT_VALUE_INT16_ARRAY; @@ -97,7 +123,7 @@ FRT_Values::AddInt16Array(uint32_t len) { void FRT_Values::AddInt16Array(const uint16_t *array, uint32_t len) { EnsureFree(); - uint16_t *pt = (uint16_t *) _tub->Alloc(len * sizeof(uint16_t)); + uint16_t *pt = (uint16_t *) _tub->alloc(len * sizeof(uint16_t)); _values[_numValues]._int16_array._pt = pt; _values[_numValues]._int16_array._len = len; _typeString[_numValues++] = FRT_VALUE_INT16_ARRAY; @@ -107,7 +133,7 @@ FRT_Values::AddInt16Array(const uint16_t *array, uint32_t len) { uint32_t * FRT_Values::AddInt32Array(uint32_t len) { EnsureFree(); - uint32_t *ret = (uint32_t *) _tub->Alloc(len * sizeof(uint32_t)); + uint32_t *ret = (uint32_t *) _tub->alloc(len * sizeof(uint32_t)); _values[_numValues]._int32_array._pt = ret; _values[_numValues]._int32_array._len = len; _typeString[_numValues++] = FRT_VALUE_INT32_ARRAY; @@ -117,7 +143,7 @@ FRT_Values::AddInt32Array(uint32_t len) { void FRT_Values::AddInt32Array(const uint32_t *array, uint32_t len) { EnsureFree(); - uint32_t *pt = (uint32_t *) _tub->Alloc(len * sizeof(uint32_t)); + uint32_t *pt = (uint32_t *) _tub->alloc(len * sizeof(uint32_t)); _values[_numValues]._int32_array._pt = pt; _values[_numValues]._int32_array._len = len; _typeString[_numValues++] = FRT_VALUE_INT32_ARRAY; @@ -127,7 +153,7 @@ FRT_Values::AddInt32Array(const uint32_t *array, uint32_t len) { uint64_t * FRT_Values::AddInt64Array(uint32_t len) { EnsureFree(); - uint64_t *ret = (uint64_t *) _tub->Alloc(len * sizeof(uint64_t)); + uint64_t *ret = (uint64_t *) _tub->alloc(len * sizeof(uint64_t)); _values[_numValues]._int64_array._pt = ret; _values[_numValues]._int64_array._len = len; _typeString[_numValues++] = FRT_VALUE_INT64_ARRAY; @@ -137,7 +163,7 @@ FRT_Values::AddInt64Array(uint32_t len) { void FRT_Values::AddInt64Array(const uint64_t *array, uint32_t len) { EnsureFree(); - uint64_t *pt = (uint64_t *) _tub->Alloc(len * sizeof(uint64_t)); + uint64_t *pt = (uint64_t *) _tub->alloc(len * sizeof(uint64_t)); _values[_numValues]._int64_array._pt = pt; _values[_numValues]._int64_array._len = len; _typeString[_numValues++] = FRT_VALUE_INT64_ARRAY; @@ -147,7 +173,7 @@ FRT_Values::AddInt64Array(const uint64_t *array, uint32_t len) { float * FRT_Values::AddFloatArray(uint32_t len) { EnsureFree(); - float *ret = (float *) _tub->Alloc(len * sizeof(float)); + float *ret = (float *) _tub->alloc(len * sizeof(float)); _values[_numValues]._float_array._pt = ret; _values[_numValues]._float_array._len = len; _typeString[_numValues++] = FRT_VALUE_FLOAT_ARRAY; @@ -157,7 +183,7 @@ FRT_Values::AddFloatArray(uint32_t len) { void FRT_Values::AddFloatArray(const float *array, uint32_t len) { EnsureFree(); - float *pt = (float *) _tub->Alloc(len * sizeof(float)); + float *pt = (float *) _tub->alloc(len * sizeof(float)); _values[_numValues]._float_array._pt = pt; _values[_numValues]._float_array._len = len; _typeString[_numValues++] = FRT_VALUE_FLOAT_ARRAY; @@ -167,7 +193,7 @@ FRT_Values::AddFloatArray(const float *array, uint32_t len) { double * FRT_Values::AddDoubleArray(uint32_t len) { EnsureFree(); - double *ret = (double *) _tub->Alloc(len * sizeof(double)); + double *ret = (double *) _tub->alloc(len * sizeof(double)); _values[_numValues]._double_array._pt = ret; _values[_numValues]._double_array._len = len; _typeString[_numValues++] = FRT_VALUE_DOUBLE_ARRAY; @@ -177,7 +203,7 @@ FRT_Values::AddDoubleArray(uint32_t len) { void FRT_Values::AddDoubleArray(const double *array, uint32_t len) { EnsureFree(); - double *pt = (double *) _tub->Alloc(len * sizeof(double)); + double *pt = (double *) _tub->alloc(len * sizeof(double)); _values[_numValues]._double_array._pt = pt; _values[_numValues]._double_array._len = len; _typeString[_numValues++] = FRT_VALUE_DOUBLE_ARRAY; @@ -187,7 +213,7 @@ FRT_Values::AddDoubleArray(const double *array, uint32_t len) { void FRT_Values::AddString(const char *str, uint32_t len) { EnsureFree(); - _values[_numValues]._string._str = _tub->CopyString(str, len); + _values[_numValues]._string._str = fnet::copyString(_tub->alloc(len+1), str, len); _values[_numValues]._string._len = len; _typeString[_numValues++] = FRT_VALUE_STRING; } @@ -195,7 +221,7 @@ FRT_Values::AddString(const char *str, uint32_t len) { char * FRT_Values::AddString(uint32_t len) { EnsureFree(); - char *ret = (char *) _tub->Alloc(len + 1); + char *ret = (char *) _tub->alloc(len + 1); _values[_numValues]._string._str = ret; _values[_numValues]._string._len = len; _typeString[_numValues++] = FRT_VALUE_STRING; @@ -205,7 +231,7 @@ FRT_Values::AddString(uint32_t len) { FRT_StringValue * FRT_Values::AddStringArray(uint32_t len) { EnsureFree(); - FRT_StringValue *ret = (FRT_StringValue *) _tub->Alloc(len * sizeof(FRT_StringValue)); + FRT_StringValue *ret = (FRT_StringValue *) _tub->alloc(len * sizeof(FRT_StringValue)); _values[_numValues]._string_array._pt = ret; _values[_numValues]._string_array._len = len; _typeString[_numValues++] = FRT_VALUE_STRING_ARRAY; @@ -228,24 +254,24 @@ FRT_Values::AddData(vespalib::alloc::Alloc buf, uint32_t len) { void FRT_Values::AddData(const char *buf, uint32_t len) { - if (len > FRT_MemoryTub::ALLOC_LIMIT) { + if (len > SHARED_LIMIT) { return AddSharedData(new (_tub) LocalBlob(buf, len)); } EnsureFree(); - _values[_numValues]._data._buf = _tub->CopyData(buf, len); + _values[_numValues]._data._buf = fnet::copyData(_tub->alloc(len), buf, len); _values[_numValues]._data._len = len; _typeString[_numValues++] = FRT_VALUE_DATA; } char * FRT_Values::AddData(uint32_t len) { - if (len > FRT_MemoryTub::ALLOC_LIMIT) { + if (len > SHARED_LIMIT) { LocalBlob *blob = new (_tub) LocalBlob(NULL, len); AddSharedData(blob); return blob->getInternalData(); } EnsureFree(); - char *ret = (char *) _tub->Alloc(len); + char *ret = (char *) _tub->alloc(len); _values[_numValues]._data._buf = ret; _values[_numValues]._data._len = len; _typeString[_numValues++] = FRT_VALUE_DATA; @@ -255,7 +281,7 @@ FRT_Values::AddData(uint32_t len) { FRT_DataValue * FRT_Values::AddDataArray(uint32_t len) { EnsureFree(); - FRT_DataValue *ret = (FRT_DataValue *) _tub->Alloc(len * sizeof(FRT_DataValue)); + FRT_DataValue *ret = (FRT_DataValue *) _tub->alloc(len * sizeof(FRT_DataValue)); _values[_numValues]._data_array._pt = ret; _values[_numValues]._data_array._len = len; _typeString[_numValues++] = FRT_VALUE_DATA_ARRAY; @@ -264,26 +290,24 @@ FRT_Values::AddDataArray(uint32_t len) { void FRT_Values::SetString(FRT_StringValue *value, const char *str, uint32_t len) { - value->_str = _tub->CopyString(str, len); + value->_str = fnet::copyString(_tub->alloc(len+1), str, len); value->_len = len; } void FRT_Values::SetString(FRT_StringValue *value, const char *str) { - uint32_t len = strlen(str); - value->_str = _tub->CopyString(str, len); - value->_len = len; + SetString(value, str, strlen(str)); } void FRT_Values::SetData(FRT_DataValue *value, const char *buf, uint32_t len) { char *mybuf = NULL; - if (len > FRT_MemoryTub::ALLOC_LIMIT) { + if (len > SHARED_LIMIT) { LocalBlob *blob = new (_tub) LocalBlob(buf, len); _blobs = new (_tub) BlobRef(value, 0, blob, _blobs); mybuf = blob->getInternalData(); } else { - mybuf = _tub->CopyData(buf, len); + mybuf = fnet::copyData(_tub->alloc(len), buf, len); } value->_buf = mybuf; value->_len = len; diff --git a/fnet/src/vespa/fnet/frt/values.h b/fnet/src/vespa/fnet/frt/values.h index cc3f2d3f4c8..a5d0e730884 100644 --- a/fnet/src/vespa/fnet/frt/values.h +++ b/fnet/src/vespa/fnet/frt/values.h @@ -2,9 +2,14 @@ #pragma once -#include "memorytub.h" #include "isharedblob.h" +#include +#include +namespace fnet { + char * copyString(char *dst, const char *src, size_t len); + char * copyData(char *dst, const void *src, size_t len); +} class FNET_DataBuffer; template @@ -74,22 +79,16 @@ union FRT_Value class FRT_Values { public: + using Stash = vespalib::Stash; + using Alloc = vespalib::alloc::Alloc; class LocalBlob : public FRT_ISharedBlob { - using Alloc = vespalib::alloc::Alloc; public: LocalBlob(Alloc data, uint32_t len) : _data(std::move(data)), _len(len) { } - LocalBlob(const char *data, uint32_t len) : - _data(Alloc::alloc(len)), - _len(len) - { - if (data != NULL) { - memcpy(_data.get(), data, len); - } - } + LocalBlob(const char *data, uint32_t len); void addRef() override {} void subRef() override { Alloc().swap(_data); } uint32_t getLen() override { return _len; } @@ -124,12 +123,12 @@ private: char *_typeString; FRT_Value *_values; BlobRef *_blobs; - FRT_MemoryTub *_tub; + Stash *_tub; public: FRT_Values(const FRT_Values &) = delete; FRT_Values &operator=(const FRT_Values &) = delete; - FRT_Values(FRT_MemoryTub *tub); + FRT_Values(Stash *tub); ~FRT_Values(); void DiscardBlobs(); @@ -251,7 +250,7 @@ public: char *AddString(uint32_t len); FRT_StringValue *AddStringArray(uint32_t len); void AddSharedData(FRT_ISharedBlob *blob); - void AddData(vespalib::alloc::Alloc buf, uint32_t len); + void AddData(Alloc buf, uint32_t len); void AddData(const char *buf, uint32_t len); char *AddData(uint32_t len); FRT_DataValue *AddDataArray(uint32_t len); diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index b524298a121..c41a0826b83 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -1,11 +1,12 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include -#include +#include "iocomponent.h" +#include "transport_thread.h" +#include FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner, - FastOS_Socket *mysocket, + FastOS_SocketInterface *mysocket, const char *spec, bool shouldTimeOut) : _ioc_next(NULL), @@ -31,6 +32,15 @@ FNET_IOComponent::~FNET_IOComponent() free(_ioc_spec); } +FNET_Config * +FNET_IOComponent::GetConfig() { + return _ioc_owner->GetConfig(); +} + +void +FNET_IOComponent::UpdateTimeOut() { + _ioc_owner->UpdateTimeOut(this); +} void FNET_IOComponent::AddRef() diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index 5de7c11a77c..efd106e0473 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -2,7 +2,15 @@ #pragma once -#include +#include "stats.h" +#include +#include + +class FNET_TransportThread; +class FNET_StatCounters; +class FastOS_SocketInterface; +class FNET_Config; +class FastOS_SocketEvent; /** * This is the common superclass of all components that may be part of @@ -38,11 +46,11 @@ protected: FNET_IOComponent *_ioc_prev; // prev in list FNET_TransportThread *_ioc_owner; // owner(TransportThread) ref. FNET_StatCounters *_ioc_counters; // stat counters - FastOS_Socket *_ioc_socket; // source of events. + FastOS_SocketInterface *_ioc_socket; // source of events. char *_ioc_spec; // connect/listen spec Flags _flags; // Compressed representation of boolean flags; fastos::TimeStamp _ioc_timestamp; // last I/O activity - FNET_Cond _ioc_cond; // synchronization + FastOS_Cond _ioc_cond; // synchronization uint32_t _ioc_refcnt; // reference counter // direct write stats kept locally @@ -63,7 +71,7 @@ public: * @param spec listen/connect spec for this IOC * @param shouldTimeOut should this IOC time out if idle ? **/ - FNET_IOComponent(FNET_TransportThread *owner, FastOS_Socket *mysocket, + FNET_IOComponent(FNET_TransportThread *owner, FastOS_SocketInterface *mysocket, const char *spec, bool shouldTimeOut); @@ -160,7 +168,7 @@ public: * * @return config object. **/ - FNET_Config *GetConfig() { return _ioc_owner->GetConfig(); } + FNET_Config *GetConfig(); /** @@ -174,7 +182,7 @@ public: * proxy-call to the owning transport object, calling * FNET_TransportThread::UpdateTimeOut() with itself as parameter. **/ - void UpdateTimeOut() { _ioc_owner->UpdateTimeOut(this); } + void UpdateTimeOut(); /** diff --git a/fnet/src/vespa/fnet/ipacketfactory.h b/fnet/src/vespa/fnet/ipacketfactory.h index 929f95f0a0c..82881b67fb6 100644 --- a/fnet/src/vespa/fnet/ipacketfactory.h +++ b/fnet/src/vespa/fnet/ipacketfactory.h @@ -2,6 +2,8 @@ #pragma once +#include "context.h" + /** * Interface describing objects that are able to create packets. An * object implementing this interface is needed in order to use the @@ -28,7 +30,6 @@ public: * for the channel that will receive the packet after it is * created and un-streamed. **/ - virtual FNET_Packet *CreatePacket(uint32_t pcode, - FNET_Context context) = 0; + virtual FNET_Packet *CreatePacket(uint32_t pcode, FNET_Context context) = 0; }; diff --git a/fnet/src/vespa/fnet/packetqueue.h b/fnet/src/vespa/fnet/packetqueue.h index baf00c24cb9..b36b899a05f 100644 --- a/fnet/src/vespa/fnet/packetqueue.h +++ b/fnet/src/vespa/fnet/packetqueue.h @@ -2,6 +2,9 @@ #pragma once +#include "ipackethandler.h" +#include + /** * This class implements a queue of packets. Being in a queue does not * affect the packet's internal data. This is the superclass of the @@ -172,7 +175,7 @@ private: protected: - FNET_Cond _cond; + FastOS_Cond _cond; uint32_t _waitCnt; @@ -185,8 +188,7 @@ public: * @param hpRetCode the value that should be returned when used * as a packet handler. Default is FNET_KEEP_CHANNEL. **/ - FNET_PacketQueue(uint32_t len = 64, - HP_RetCode hpRetCode = FNET_KEEP_CHANNEL); + FNET_PacketQueue(uint32_t len = 64, HP_RetCode hpRetCode = FNET_KEEP_CHANNEL); virtual ~FNET_PacketQueue(); diff --git a/fnet/src/vespa/fnet/scheduler.cpp b/fnet/src/vespa/fnet/scheduler.cpp index 63d107bc334..56426a5aa92 100644 --- a/fnet/src/vespa/fnet/scheduler.cpp +++ b/fnet/src/vespa/fnet/scheduler.cpp @@ -1,11 +1,11 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include +#include "scheduler.h" +#include "task.h" + #include LOG_SETUP(".fnet.scheduler"); -#include -#include -#include + FNET_Scheduler::FNET_Scheduler(FastOS_Time *sampler, FastOS_Time *now) @@ -173,3 +173,107 @@ FNET_Scheduler::CheckTasks() } Unlock(); } + +void +FNET_Scheduler::FirstTask(uint32_t slot) { + _currPt = _slots[slot]; + _tailPt = (_currPt != NULL) ? + _currPt->_task_prev : NULL; +} + +void +FNET_Scheduler::NextTask() { + _currPt = (_currPt != _tailPt) ? + _currPt->_task_next : NULL; +} + +void +FNET_Scheduler::AdjustCurrPt() { + _currPt = (_currPt != _tailPt) ? + _currPt->_task_next : NULL; +} + +void +FNET_Scheduler::AdjustTailPt() { + _tailPt = _tailPt->_task_prev; +} + +void +FNET_Scheduler::LinkIn(FNET_Task *task) { + FNET_Task **head = &(_slots[task->_task_slot]); + + if ((*head) == NULL) { + (*head) = task; + task->_task_next = task; + task->_task_prev = task; + } else { + task->_task_next = (*head); + task->_task_prev = (*head)->_task_prev; + (*head)->_task_prev->_task_next = task; + (*head)->_task_prev = task; + } +} + +void +FNET_Scheduler::LinkOut(FNET_Task *task) { + FNET_Task **head = &(_slots[task->_task_slot]); + + if (task == _currPt) + AdjustCurrPt(); + else if (task == _tailPt) + AdjustTailPt(); + + if (task->_task_next == task) { + (*head) = NULL; + } else { + task->_task_prev->_task_next = task->_task_next; + task->_task_next->_task_prev = task->_task_prev; + if ((*head) == task) + (*head) = task->_task_next; + } + task->_task_next = NULL; + task->_task_prev = NULL; +} + +void +FNET_Scheduler::BeforeTask(FNET_Task *task) { + _performing = task; + Unlock(); +} + +void +FNET_Scheduler::AfterTask() { + Lock(); + _performing = NULL; + if (_waitTask) { + _waitTask = false; + Broadcast(); + } +} + +void +FNET_Scheduler::WaitTask(FNET_Task *task) { + while (IsPerforming(task)) { + _waitTask = true; + Wait(); + } +} + +void +FNET_Scheduler::PerformTasks(uint32_t slot, uint32_t iter) { + FirstTask(slot); + for (FNET_Task *task; (task = GetTask()) != NULL; ) { + NextTask(); + + if (task->_task_iter == iter) { + LinkOut(task); + BeforeTask(task); + task->PerformTask(); // PERFORM TASK + AfterTask(); + } + } +} + +bool FNET_Scheduler::IsActive(FNET_Task *task) { + return task->_task_next != NULL; +} \ No newline at end of file diff --git a/fnet/src/vespa/fnet/scheduler.h b/fnet/src/vespa/fnet/scheduler.h index af7044f3c4f..7636b16ae9a 100644 --- a/fnet/src/vespa/fnet/scheduler.h +++ b/fnet/src/vespa/fnet/scheduler.h @@ -3,6 +3,9 @@ #pragma once #include +#include + +class FNET_Task; /** * An object of this class handles scheduling of @ref FNET_Task @@ -24,7 +27,7 @@ public: }; private: - FNET_Cond _cond; + FastOS_Cond _cond; FNET_Task *_slots[NUM_SLOTS + 1]; FastOS_Time _next; FastOS_Time _now; @@ -44,129 +47,20 @@ private: void Wait() { _cond.Wait(); } void Broadcast() { _cond.Broadcast(); } - - FNET_Task *GetTask() - { - return _currPt; - } - - - void FirstTask(uint32_t slot) - { - _currPt = _slots[slot]; - _tailPt = (_currPt != NULL) ? - _currPt->_task_prev : NULL; - } - - - void NextTask() - { - _currPt = (_currPt != _tailPt) ? - _currPt->_task_next : NULL; - } - - - void AdjustCurrPt() - { - _currPt = (_currPt != _tailPt) ? - _currPt->_task_next : NULL; - } - - - void AdjustTailPt() - { - _tailPt = _tailPt->_task_prev; - } - - - void LinkIn(FNET_Task *task) - { - FNET_Task **head = &(_slots[task->_task_slot]); - - if ((*head) == NULL) { - (*head) = task; - task->_task_next = task; - task->_task_prev = task; - } else { - task->_task_next = (*head); - task->_task_prev = (*head)->_task_prev; - (*head)->_task_prev->_task_next = task; - (*head)->_task_prev = task; - } - } - - - void LinkOut(FNET_Task *task) - { - FNET_Task **head = &(_slots[task->_task_slot]); - - if (task == _currPt) - AdjustCurrPt(); - else if (task == _tailPt) - AdjustTailPt(); - - if (task->_task_next == task) { - (*head) = NULL; - } else { - task->_task_prev->_task_next = task->_task_next; - task->_task_next->_task_prev = task->_task_prev; - if ((*head) == task) - (*head) = task->_task_next; - } - task->_task_next = NULL; - task->_task_prev = NULL; - } - - - static bool IsActive(FNET_Task *task) - { return task->_task_next != NULL; } - - - bool IsPerforming(FNET_Task *task) - { return task == _performing; } - - - void BeforeTask(FNET_Task *task) - { - _performing = task; - Unlock(); - } - - - void AfterTask() - { - Lock(); - _performing = NULL; - if (_waitTask) { - _waitTask = false; - Broadcast(); - } - } - - - void WaitTask(FNET_Task *task) - { - while (IsPerforming(task)) { - _waitTask = true; - Wait(); - } - } - - - void PerformTasks(uint32_t slot, uint32_t iter) - { - FirstTask(slot); - for (FNET_Task *task; (task = GetTask()) != NULL; ) { - NextTask(); - - if (task->_task_iter == iter) { - LinkOut(task); - BeforeTask(task); - task->PerformTask(); // PERFORM TASK - AfterTask(); - } - } - } + FNET_Task *GetTask() { return _currPt; } + + void FirstTask(uint32_t slot); + void NextTask(); + void AdjustCurrPt(); + void AdjustTailPt(); + void LinkIn(FNET_Task *task); + void LinkOut(FNET_Task *task); + bool IsPerforming(FNET_Task *task) { return task == _performing; } + void BeforeTask(FNET_Task *task); + void AfterTask(); + void WaitTask(FNET_Task *task); + void PerformTasks(uint32_t slot, uint32_t iter); + bool IsActive(FNET_Task *task); public: diff --git a/fnet/src/vespa/fnet/simplepacketstreamer.h b/fnet/src/vespa/fnet/simplepacketstreamer.h index 30719559fda..b075206107b 100644 --- a/fnet/src/vespa/fnet/simplepacketstreamer.h +++ b/fnet/src/vespa/fnet/simplepacketstreamer.h @@ -2,6 +2,8 @@ #pragma once +#include "ipacketstreamer.h" + /** * This is a convenience class. Large applications may want to * implement the functionality offered by this class themselves to diff --git a/fnet/src/vespa/fnet/stats.h b/fnet/src/vespa/fnet/stats.h index f7ddf37c1b2..f72af4e979c 100644 --- a/fnet/src/vespa/fnet/stats.h +++ b/fnet/src/vespa/fnet/stats.h @@ -2,6 +2,8 @@ #pragma once +#include + /** * This class is used internally by @ref FNET_Transport objects to * aggregate FNET statistics. The actual statistics are located in the diff --git a/fnet/src/vespa/fnet/transport.cpp b/fnet/src/vespa/fnet/transport.cpp index 2eebc12c150..129017b53f7 100644 --- a/fnet/src/vespa/fnet/transport.cpp +++ b/fnet/src/vespa/fnet/transport.cpp @@ -1,7 +1,9 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "fnet.h" +#include "transport.h" #include "connect_thread.h" +#include "transport_thread.h" +#include "iocomponent.h" #include namespace { @@ -165,3 +167,57 @@ FNET_Transport::Start(FastOS_ThreadPool *pool) } return result; } + +void +FNET_Transport::Add(FNET_IOComponent *comp, bool needRef) { + comp->Owner()->Add(comp, needRef); +} + +void +FNET_Transport::EnableRead(FNET_IOComponent *comp, bool needRef) { + comp->Owner()->EnableRead(comp, needRef); +} + +void +FNET_Transport::DisableRead(FNET_IOComponent *comp, bool needRef) { + comp->Owner()->DisableRead(comp, needRef); +} + +void +FNET_Transport::EnableWrite(FNET_IOComponent *comp, bool needRef) { + comp->Owner()->EnableWrite(comp, needRef); +} + +void +FNET_Transport::DisableWrite(FNET_IOComponent *comp, bool needRef) { + comp->Owner()->DisableWrite(comp, needRef); +} + +void +FNET_Transport::Close(FNET_IOComponent *comp, bool needRef) { + comp->Owner()->Close(comp, needRef); +} + +FastOS_TimeInterface * +FNET_Transport::GetTimeSampler() { + assert(_threads.size() == 1); + return _threads[0]->GetTimeSampler(); +} + +bool +FNET_Transport::InitEventLoop() { + assert(_threads.size() == 1); + return _threads[0]->InitEventLoop(); +} + +bool +FNET_Transport::EventLoopIteration() { + assert(_threads.size() == 1); + return _threads[0]->EventLoopIteration(); +} + +void +FNET_Transport::Main() { + assert(_threads.size() == 1); + _threads[0]->Main(); +} \ No newline at end of file diff --git a/fnet/src/vespa/fnet/transport.h b/fnet/src/vespa/fnet/transport.h index e3ef3edf4bc..53f84d3a9a9 100644 --- a/fnet/src/vespa/fnet/transport.h +++ b/fnet/src/vespa/fnet/transport.h @@ -3,6 +3,18 @@ #pragma once #include "ext_connectable.h" +#include "context.h" +#include +#include + +class FastOS_TimeInterface; +class FNET_TransportThread; +class FastOS_ThreadPool; +class FNET_Connector; +class FNET_IPacketStreamer; +class FNET_IServerAdapter; +class FNET_IPacketHandler; +class FNET_Scheduler; namespace fnet { class ConnectThread; @@ -246,52 +258,20 @@ public: // forward async IO Component operations to their owners //------------------------------------------------------------------------- - static void Add(FNET_IOComponent *comp, bool needRef = true) { - comp->Owner()->Add(comp, needRef); - } - - static void EnableRead(FNET_IOComponent *comp, bool needRef = true) { - comp->Owner()->EnableRead(comp, needRef); - } - - static void DisableRead(FNET_IOComponent *comp, bool needRef = true) { - comp->Owner()->DisableRead(comp, needRef); - } - - static void EnableWrite(FNET_IOComponent *comp, bool needRef = true) { - comp->Owner()->EnableWrite(comp, needRef); - } - - static void DisableWrite(FNET_IOComponent *comp, bool needRef = true) { - comp->Owner()->DisableWrite(comp, needRef); - } - - static void Close(FNET_IOComponent *comp, bool needRef = true) { - comp->Owner()->Close(comp, needRef); - } + static void Add(FNET_IOComponent *comp, bool needRef = true); + static void EnableRead(FNET_IOComponent *comp, bool needRef = true); + static void DisableRead(FNET_IOComponent *comp, bool needRef = true); + static void EnableWrite(FNET_IOComponent *comp, bool needRef = true); + static void DisableWrite(FNET_IOComponent *comp, bool needRef = true); + static void Close(FNET_IOComponent *comp, bool needRef = true); //------------------------------------------------------------------------- // single-threaded API forwarding. num_threads must be 1. Note: Choose // only one of: (a) Start, (b) Main, (c) InitEventLoop + EventLoopIteration // ------------------------------------------------------------------------- - FastOS_Time *GetTimeSampler() { - assert(_threads.size() == 1); - return _threads[0]->GetTimeSampler(); - } - - bool InitEventLoop() { - assert(_threads.size() == 1); - return _threads[0]->InitEventLoop(); - } - - bool EventLoopIteration() { - assert(_threads.size() == 1); - return _threads[0]->EventLoopIteration(); - } - - void Main() { - assert(_threads.size() == 1); - _threads[0]->Main(); - } + FastOS_TimeInterface *GetTimeSampler(); + bool InitEventLoop(); + bool EventLoopIteration(); + void Main(); }; diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 9efdd087321..6b99c92ba73 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -1,9 +1,16 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include + +#include "transport_thread.h" +#include "iexecutable.h" +#include "iocomponent.h" +#include "controlpacket.h" +#include "connector.h" +#include "connection.h" +#include "transport.h" +#include + #include LOG_SETUP(".fnet"); -#include -#include namespace { diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 6a9bb29e469..41378111419 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -2,8 +2,19 @@ #pragma once +#include "scheduler.h" +#include "config.h" +#include "task.h" +#include "packetqueue.h" +#include "stats.h" #include #include +#include + +class FNET_Transport; +class FNET_ControlPacket; +class FNET_IPacketStreamer; +class FNET_IServerAdapter; /** * This class represents a transport thread and handles a subset of @@ -54,7 +65,7 @@ private: FastOS_IOEvent *_events; // I/O event array FNET_PacketQueue_NoLock _queue; // outer event queue FNET_PacketQueue_NoLock _myQueue; // inner event queue - FNET_Cond _cond; // used for synchronization + FastOS_Cond _cond; // used for synchronization bool _started; // event loop started ? bool _shutdown; // should stop event loop ? bool _finished; // event loop stopped ? -- cgit v1.2.3