diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-10-27 12:00:08 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-10-27 12:00:08 +0000 |
commit | 0dd33f6fac0cdc3e0dc23d6a857f34663f093b84 (patch) | |
tree | 103e9be6cb7b9c103be37f0a508c89f1b6a89cc2 /fnet | |
parent | 9d64263c147c862d7cd9017673d8208db751e781 (diff) |
Use std::mutex and std::condition_variable instead of FastOS_Cond.
Diffstat (limited to 'fnet')
-rw-r--r-- | fnet/src/tests/info/info.cpp | 4 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 130 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 14 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.cpp | 21 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.h | 44 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 3 |
6 files changed, 90 insertions, 126 deletions
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index ea8de81cff5..f15c8f83d80 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -70,10 +70,10 @@ TEST("info") { TEST("size of important objects") { - EXPECT_EQUAL(192u, sizeof(FNET_IOComponent)); + EXPECT_EQUAL(184u, sizeof(FNET_IOComponent)); EXPECT_EQUAL(32u, sizeof(FNET_Channel)); EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock)); - EXPECT_EQUAL(496u, sizeof(FNET_Connection)); + EXPECT_EQUAL(488u, sizeof(FNET_Connection)); EXPECT_EQUAL(96u, sizeof(FastOS_Cond)); EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer)); EXPECT_EQUAL(24u, sizeof(FastOS_Time)); diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index bdaeeefd1ae..9fd3f52cc09 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -101,7 +101,7 @@ FNET_Connection::SetState(State state) State oldstate; std::vector<FNET_Channel::UP> toDelete; - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); oldstate = _state; _state = state; if (LOG_WOULD_LOG(debug) && state != oldstate) { @@ -113,22 +113,22 @@ FNET_Connection::SetState(State state) if (_flags._writeLock) { _flags._discarding = true; while (_flags._writeLock) - Wait(); + _ioc_cond.wait(guard); _flags._discarding = false; } while (!_queue.IsEmpty_NoLock() || !_myQueue.IsEmpty_NoLock()) { _flags._discarding = true; _queue.FlushPackets_NoLock(&_myQueue); - Unlock(); + guard.unlock(); _myQueue.DiscardPackets_NoLock(); - Lock(); + guard.lock(); _flags._discarding = false; } - BeforeCallback(nullptr); + BeforeCallback(guard, nullptr); toDelete = _channels.Broadcast(&FNET_ControlPacket::ChannelLost); - AfterCallback(); + AfterCallback(guard); } if ( ! toDelete.empty() ) { @@ -142,7 +142,6 @@ FNET_Connection::SetState(State state) } } } - Unlock(); } @@ -154,7 +153,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, FNET_Channel *channel; FNET_IPacketHandler::HP_RetCode hp_rc; - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); channel = _channels.Lookup(chid); if (channel != nullptr) { // deliver packet on open channel @@ -162,12 +161,12 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, __builtin_prefetch(&_streamer); __builtin_prefetch(&_input); - BeforeCallback(channel); + BeforeCallback(guard, channel); __builtin_prefetch(channel->GetHandler(), 0); // Prefetch the handler while packet is being decoded. packet = _streamer->Decode(&_input, plen, pcode, channel->GetContext()); hp_rc = (packet != nullptr) ? channel->Receive(packet) : channel->Receive(&FNET_ControlPacket::BadPacket); - AfterCallback(); + AfterCallback(guard); FNET_Channel::UP toDelete; if (hp_rc > FNET_IPacketHandler::FNET_KEEP_CHANNEL) { @@ -182,21 +181,18 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, toDelete.reset(channel); } } - - Unlock(); - } else if (CanAcceptChannels() && IsFromPeer(chid)) { // open new channel FNET_Channel::UP newChannel(new FNET_Channel(chid, this)); channel = newChannel.get(); AddRef_NoLock(); - BeforeCallback(channel); + BeforeCallback(guard, channel); if (_serverAdapter->InitChannel(channel, pcode)) { packet = _streamer->Decode(&_input, plen, pcode, channel->GetContext()); hp_rc = (packet != nullptr) ? channel->Receive(packet) : channel->Receive(&FNET_ControlPacket::BadPacket); - AfterCallback(); + AfterCallback(guard); if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) { SubRef_NoLock(); @@ -205,14 +201,11 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, } else { newChannel.release(); // It has already been taken care of, so we should not free it here. } - - Unlock(); - } else { - AfterCallback(); + AfterCallback(guard); SubRef_NoLock(); - Unlock(); + guard.unlock(); LOG(debug, "Connection(%s): channel init failed", GetSpec()); _input.DataToDead(plen); @@ -220,7 +213,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, } else { // skip unhandled packet - Unlock(); + guard.unlock(); LOG(spam, "Connection(%s): skipping unhandled packet", GetSpec()); _input.DataToDead(plen); } @@ -363,13 +356,14 @@ FNET_Connection::Write(bool direct) } } - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); _writeWork = _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock() + ((_output.GetDataLen() > 0) ? 1 : 0); _flags._writeLock = false; - if (_flags._discarding) - Broadcast(); + if (_flags._discarding) { + _ioc_cond.notify_all(); + } bool writePending = (_writeWork > 0); if (direct) { // direct write (from post packet) @@ -379,17 +373,15 @@ FNET_Connection::Write(bool direct) } if (writePending) { AddRef_NoLock(); - Unlock(); + guard.unlock(); if (broken) { Owner()->Close(this, /* needRef = */ false); } else { Owner()->EnableWrite(this, /* needRef = */ false); } - } else { - Unlock(); } } else { // normal write (from event loop) - Unlock(); + guard.unlock(); if (writtenData > 0) { CountDataWrite(writtenData); CountPacketWrite(writtenPackets); @@ -547,18 +539,17 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler, FNET_Channel::UP newChannel(new FNET_Channel(FNET_NOID, this, handler, context)); FNET_Channel * ret = nullptr; - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); if (__builtin_expect(_state < FNET_CLOSING, true)) { newChannel->SetID(GetNextID()); if (chid != nullptr) { *chid = newChannel->GetID(); } - WaitCallback(nullptr); + WaitCallback(guard, nullptr); AddRef_NoLock(); ret = newChannel.release(); _channels.Register(ret); } - Unlock(); return ret; } @@ -567,10 +558,12 @@ FNET_Channel* FNET_Connection::OpenChannel() { - Lock(); - uint32_t chid = GetNextID(); - AddRef_NoLock(); - Unlock(); + uint32_t chid; + { + std::unique_lock<std::mutex> guard(_ioc_lock); + chid = GetNextID(); + AddRef_NoLock(); + } return new FNET_Channel(chid, this); } @@ -579,10 +572,9 @@ bool FNET_Connection::CloseChannel(FNET_Channel *channel) { bool ret; - Lock(); - WaitCallback(channel); + std::unique_lock<std::mutex> guard(_ioc_lock); + WaitCallback(guard, channel); ret = _channels.Unregister(channel); - Unlock(); return ret; } @@ -591,18 +583,18 @@ void FNET_Connection::FreeChannel(FNET_Channel *channel) { delete channel; - Lock(); - SubRef_HasLock(); + std::unique_lock<std::mutex> guard(_ioc_lock); + SubRef_HasLock(std::move(guard)); } void FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel) { - Lock(); - WaitCallback(channel); + std::unique_lock<std::mutex> guard(_ioc_lock); + WaitCallback(guard, channel); _channels.Unregister(channel); - SubRef_HasLock(); + SubRef_HasLock(std::move(guard)); delete channel; } @@ -610,17 +602,16 @@ FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel) void FNET_Connection::CloseAdminChannel() { - Lock(); FNET_Channel::UP toDelete; + std::unique_lock<std::mutex> guard(_ioc_lock); if (_adminChannel != nullptr) { - WaitCallback(_adminChannel); + WaitCallback(guard, _adminChannel); if (_adminChannel != nullptr) { _channels.Unregister(_adminChannel); toDelete.reset(_adminChannel); _adminChannel = nullptr; } } - Unlock(); } @@ -630,13 +621,12 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) uint32_t writeWork; assert(packet != nullptr); - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); if (_state >= FNET_CLOSING) { if (_flags._discarding) { _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); - Unlock(); } else { - Unlock(); + guard.unlock(); packet->Free(); // discard packet } return false; // connection is down @@ -650,15 +640,13 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) if (GetConfig()->_directWrite) { _flags._writeLock = true; _queue.FlushPackets_NoLock(&_myQueue); - Unlock(); + guard.unlock(); Write(true); } else { AddRef_NoLock(); - Unlock(); + guard.unlock(); Owner()->EnableWrite(this, /* needRef = */ false); } - } else { - Unlock(); } return true; } @@ -667,10 +655,9 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) uint32_t FNET_Connection::GetQueueLen() { - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); uint32_t ret = _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock(); - Unlock(); return ret; } @@ -735,12 +722,14 @@ FNET_Connection::HandleWriteEvent() case FNET_CONNECTING: error = _socket.get_so_error(); if (error == 0) { // connect ok - Lock(); - _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) - LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), - GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); - bool writePending = (_writeWork > 0); - Unlock(); + bool writePending; + { + std::unique_lock<std::mutex> guard(_ioc_lock); + _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) + LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), + GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); + writePending = (_writeWork > 0); + } if (!writePending) EnableWriteEvent(false); } else { @@ -751,17 +740,20 @@ FNET_Connection::HandleWriteEvent() } break; case FNET_CONNECTED: - Lock(); - if (_flags._writeLock) { - Unlock(); - EnableWriteEvent(false); - return true; + { + { + std::unique_lock<std::mutex> guard(_ioc_lock); + if (_flags._writeLock) { + guard.unlock(); + EnableWriteEvent(false); + return true; + } + _flags._writeLock = true; + _queue.FlushPackets_NoLock(&_myQueue); } - _flags._writeLock = true; - _queue.FlushPackets_NoLock(&_myQueue); - Unlock(); broken = !Write(false); break; + } case FNET_CLOSING: case FNET_CLOSED: default: diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 699516554e2..1126f8bf1e6 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -138,12 +138,12 @@ private: * control packets all channels are callback * targets at the same time. **/ - void WaitCallback(FNET_Channel *channel) + void WaitCallback(std::unique_lock<std::mutex> &guard, FNET_Channel *channel) { while (_flags._inCallback && (_callbackTarget == channel || _callbackTarget == nullptr)) { _flags._callbackWait = true; - Wait(); + _ioc_cond.wait(guard); } } @@ -153,11 +153,11 @@ private: * called. When this method returns the object will be in callback * mode, but not locked. **/ - void BeforeCallback(FNET_Channel *channel) + void BeforeCallback(std::unique_lock<std::mutex> &guard, FNET_Channel *channel) { _flags._inCallback = true; _callbackTarget = channel; - Unlock(); + guard.unlock(); } /** @@ -166,13 +166,13 @@ private: * called, but not locked. When this method returns the object is no * longer in callback mode, but locked. **/ - void AfterCallback() + void AfterCallback(std::unique_lock<std::mutex> &guard) { - Lock(); + guard.lock(); _flags._inCallback = false; if (_flags._callbackWait) { _flags._callbackWait = false; - Broadcast(); + _ioc_cond.notify_all(); } } diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index e5544ceff0e..ec51c1f080e 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -18,6 +18,7 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner, _ioc_spec(nullptr), _flags(shouldTimeOut), _ioc_timestamp(fastos::ClockSystem::now()), + _ioc_lock(), _ioc_cond(), _ioc_refcnt(1), _ioc_directPacketWriteCnt(0), @@ -47,10 +48,9 @@ FNET_IOComponent::UpdateTimeOut() { void FNET_IOComponent::AddRef() { - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); assert(_ioc_refcnt > 0); _ioc_refcnt++; - Unlock(); } @@ -65,27 +65,26 @@ FNET_IOComponent::AddRef_NoLock() void FNET_IOComponent::SubRef() { - Lock(); - assert(_ioc_refcnt > 0); - if (--_ioc_refcnt > 0) { - Unlock(); - return; + { + std::unique_lock<std::mutex> guard(_ioc_lock); + assert(_ioc_refcnt > 0); + if (--_ioc_refcnt > 0) { + return; + } } - Unlock(); CleanupHook(); delete this; } void -FNET_IOComponent::SubRef_HasLock() +FNET_IOComponent::SubRef_HasLock(std::unique_lock<std::mutex> guard) { assert(_ioc_refcnt > 0); if (--_ioc_refcnt > 0) { - Unlock(); return; } - Unlock(); + guard.unlock(); CleanupHook(); delete this; } diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index d0d8cee85b7..a48930428c7 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -3,9 +3,10 @@ #pragma once #include "stats.h" -#include <vespa/fastos/cond.h> #include <vespa/fastos/timestamp.h> #include <vespa/vespalib/net/selector.h> +#include <mutex> +#include <condition_variable> class FNET_TransportThread; class FNET_StatCounters; @@ -50,7 +51,8 @@ protected: char *_ioc_spec; // connect/listen spec Flags _flags; // Compressed representation of boolean flags; fastos::TimeStamp _ioc_timestamp; // last I/O activity - FastOS_Cond _ioc_cond; // synchronization + std::mutex _ioc_lock; // synchronization + std::condition_variable _ioc_cond; // synchronization uint32_t _ioc_refcnt; // reference counter // direct write stats kept locally @@ -86,38 +88,10 @@ public: **/ const char *GetSpec() const { return _ioc_spec; } - - /** - * Lock object to gain exclusive access. - **/ - void Lock() { _ioc_cond.Lock(); } - - - /** - * Unlock object to yield exclusive access. - **/ - void Unlock() { _ioc_cond.Unlock(); } - - - /** - * Wait on this object. Caller should have lock on object. - **/ - void Wait() { _ioc_cond.Wait(); } - - - /** - * Signal one thread waiting on this object. Caller should have - * lock. - **/ - void Signal() { _ioc_cond.Signal(); } - - - /** - * Signal all thread waiting on this object. Caller should have - * lock. - **/ - void Broadcast() { _ioc_cond.Broadcast(); } - + /* + * Get a guard to gain exclusive access. + */ + std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_ioc_lock); } /** * Allocate a reference to this component. This method locks the @@ -145,7 +119,7 @@ public: * protect the reference counter, but assumes that the lock has * already been obtained when this method is called. **/ - void SubRef_HasLock(); + void SubRef_HasLock(std::unique_lock<std::mutex> guard); /** diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index bd3ad3c53ee..61fcc26f1c6 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -169,9 +169,8 @@ FNET_TransportThread::UpdateStats() for (FNET_IOComponent *comp = _componentsHead; comp != nullptr; comp = comp->_ioc_next) { - comp->Lock(); + auto guard(comp->getGuard()); comp->FlushDirectWriteStats(); - comp->Unlock(); } { std::unique_lock<std::mutex> guard(_lock); |