diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2017-10-27 17:46:26 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-27 17:46:26 +0000 |
commit | 86730de554af723ffd5e4d57b7e3e4d54cb61c0f (patch) | |
tree | 5f1966330a044776f90c2199d9d62e7301f03755 | |
parent | ebdeac00bfe7c6682d258728708887437c0067f3 (diff) | |
parent | 54f460f1842cf4a202ce0358470c631d018e0451 (diff) |
Merge pull request #3926 from vespa-engine/toregge/use-standard-locking-in-fnet
Toregge/use standard locking in fnet
-rw-r--r-- | fnet/src/tests/frt/rpc/invoke.cpp | 49 | ||||
-rw-r--r-- | fnet/src/tests/frt/rpc/session.cpp | 15 | ||||
-rw-r--r-- | fnet/src/tests/info/info.cpp | 8 | ||||
-rw-r--r-- | fnet/src/tests/locking/drainpackets.cpp | 43 | ||||
-rw-r--r-- | fnet/src/tests/locking/lockspeed.cpp | 82 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 159 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 15 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/invoker.cpp | 19 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/frt/invoker.h | 6 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.cpp | 21 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/iocomponent.h | 44 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/packetqueue.cpp | 30 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/packetqueue.h | 43 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/scheduler.cpp | 77 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/scheduler.h | 19 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 90 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 29 |
17 files changed, 330 insertions, 419 deletions
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp index 8f4f9dfca90..f44a58dd8b3 100644 --- a/fnet/src/tests/frt/rpc/invoke.cpp +++ b/fnet/src/tests/frt/rpc/invoke.cpp @@ -1,10 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/fnet/frt/frt.h> +#include <mutex> +#include <condition_variable> //------------------------------------------------------------- -FastOS_Mutex _delayedReturnCntLock; +std::mutex _delayedReturnCntLock; uint32_t _delayedReturnCnt = 0; uint32_t _phase_simple_cnt = 0; @@ -20,10 +22,11 @@ uint32_t _phase_echo_cnt = 0; struct LockedReqWait : public FRT_IRequestWait { - FastOS_Cond _cond; // cond used to signal req done + std::mutex _condLock; // cond used to signal req done + std::condition_variable _cond; // cond used to signal req done bool _done; // flag indicating req done - FastOS_Mutex _lockLock; // lock protecting virtual lock + std::mutex _lockLock; // lock protecting virtual lock bool _lock; // virtual lock bool _wasLocked; // was 'locked' when req done @@ -31,38 +34,32 @@ struct LockedReqWait : public FRT_IRequestWait ~LockedReqWait() {} void lock() { - _lockLock.Lock(); + std::lock_guard<std::mutex> guard(_lockLock); _lock = true; - _lockLock.Unlock(); } void unlock() { - _lockLock.Lock(); + std::lock_guard<std::mutex> guard(_lockLock); _lock = false; - _lockLock.Unlock(); } bool isLocked() { - _lockLock.Lock(); - bool ret = _lock; - _lockLock.Unlock(); - return ret; + std::lock_guard<std::mutex> guard(_lockLock); + return _lock; } void RequestDone(FRT_RPCRequest *) override { _wasLocked = isLocked(); - _cond.Lock(); + std::lock_guard<std::mutex> guard(_condLock); _done = true; - _cond.Signal(); - _cond.Unlock(); + _cond.notify_one(); } void waitReq() { - _cond.Lock(); + std::unique_lock<std::mutex> guard(_condLock); while(!_done) { - _cond.Wait(); + _cond.wait(guard); } - _cond.Unlock(); } }; @@ -81,18 +78,18 @@ public: : FNET_Task(sched), _req(req) { - _delayedReturnCntLock.Lock(); - _delayedReturnCnt++; - _delayedReturnCntLock.Unlock(); + { + std::lock_guard<std::mutex> guard(_delayedReturnCntLock); + _delayedReturnCnt++; + } Schedule(delay); } void PerformTask() override { _req->Return(); - _delayedReturnCntLock.Lock(); + std::lock_guard<std::mutex> guard(_delayedReturnCntLock); _delayedReturnCnt--; - _delayedReturnCntLock.Unlock(); } }; @@ -547,9 +544,11 @@ State::WaitForDelayedReturnCount(uint32_t wantedCount, double timeout) FastOS_Time timer; timer.SetNow(); for (;;) { - _delayedReturnCntLock.Lock(); - uint32_t delayedReturnCnt = _delayedReturnCnt; - _delayedReturnCntLock.Unlock(); + uint32_t delayedReturnCnt; + { + std::lock_guard<std::mutex> guard(_delayedReturnCntLock); + delayedReturnCnt = _delayedReturnCnt; + } if (delayedReturnCnt == wantedCount) { return true; } diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp index 0316129a8b5..2e920ac9f98 100644 --- a/fnet/src/tests/frt/rpc/session.cpp +++ b/fnet/src/tests/frt/rpc/session.cpp @@ -2,28 +2,27 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/fnet/frt/frt.h> +#include <mutex> class Session { private: - static FastOS_Mutex _lock; + static std::mutex _lock; static int _cnt; int _val; public: Session() : _val(0) { - _lock.Lock(); - ++_cnt; - _lock.Unlock(); + std::lock_guard<std::mutex> guard(_lock); + ++_cnt; } ~Session() { - _lock.Lock(); - --_cnt; - _lock.Unlock(); + std::lock_guard<std::mutex> guard(_lock); + --_cnt; } void SetValue(int val) { _val = val; } @@ -31,7 +30,7 @@ public: static int GetCnt() { return _cnt; } }; -FastOS_Mutex Session::_lock; +std::mutex Session::_lock; int Session::_cnt(0); diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp index ea8de81cff5..cd0364cad6f 100644 --- a/fnet/src/tests/info/info.cpp +++ b/fnet/src/tests/info/info.cpp @@ -70,16 +70,16 @@ TEST("info") { TEST("size of important objects") { - EXPECT_EQUAL(192u, sizeof(FNET_IOComponent)); + EXPECT_EQUAL(184u, sizeof(FNET_IOComponent)); EXPECT_EQUAL(32u, sizeof(FNET_Channel)); EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock)); - EXPECT_EQUAL(496u, sizeof(FNET_Connection)); - EXPECT_EQUAL(96u, sizeof(FastOS_Cond)); + EXPECT_EQUAL(488u, sizeof(FNET_Connection)); + EXPECT_EQUAL(48u, sizeof(std::condition_variable)); EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer)); EXPECT_EQUAL(24u, sizeof(FastOS_Time)); EXPECT_EQUAL(8u, sizeof(FNET_Context)); EXPECT_EQUAL(8u, sizeof(fastos::TimeStamp)); - EXPECT_EQUAL(48u, sizeof(FastOS_Mutex)); + EXPECT_EQUAL(40u, sizeof(std::mutex)); EXPECT_EQUAL(40u, sizeof(pthread_mutex_t)); EXPECT_EQUAL(48u, sizeof(pthread_cond_t)); EXPECT_EQUAL(40u, sizeof(std::mutex)); diff --git a/fnet/src/tests/locking/drainpackets.cpp b/fnet/src/tests/locking/drainpackets.cpp index 05ba923376d..a2999f0bddc 100644 --- a/fnet/src/tests/locking/drainpackets.cpp +++ b/fnet/src/tests/locking/drainpackets.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/fnet/fnet.h> +#include <mutex> class MyPacket : public FNET_Packet { @@ -16,7 +17,7 @@ TEST("drain packets") { FastOS_Time start; FastOS_Time stop; - FastOS_Mutex lock; + std::mutex lock; FNET_PacketQueue q1(512); FNET_PacketQueue q2(512); @@ -39,25 +40,23 @@ TEST("drain packets") { FNET_Packet *packet; FNET_Context context; - lock.Lock(); - - while (!q1.IsEmpty_NoLock()) { - packet = q1.DequeuePacket_NoLock(&context); - q3.QueuePacket_NoLock(packet, context); + { + std::lock_guard<std::mutex> guard(lock); + while (!q1.IsEmpty_NoLock()) { + packet = q1.DequeuePacket_NoLock(&context); + q3.QueuePacket_NoLock(packet, context); + } } - lock.Unlock(); - //------------------------ - lock.Lock(); - - while (!q3.IsEmpty_NoLock()) { - packet = q3.DequeuePacket_NoLock(&context); - q1.QueuePacket_NoLock(packet, context); + { + std::lock_guard<std::mutex> guard(lock); + while (!q3.IsEmpty_NoLock()) { + packet = q3.DequeuePacket_NoLock(&context); + q1.QueuePacket_NoLock(packet, context); + } } - - lock.Unlock(); } stop.SetNow(); @@ -74,9 +73,10 @@ TEST("drain packets") { FNET_Packet *packet; FNET_Context context; - lock.Lock(); - q1.FlushPackets_NoLock(&q2); - lock.Unlock(); + { + std::lock_guard<std::mutex> guard(lock); + q1.FlushPackets_NoLock(&q2); + } while (!q2.IsEmpty_NoLock()) { packet = q2.DequeuePacket_NoLock(&context); @@ -85,9 +85,10 @@ TEST("drain packets") { //------------------------ - lock.Lock(); - q3.FlushPackets_NoLock(&q2); - lock.Unlock(); + { + std::lock_guard<std::mutex> guard(lock); + q3.FlushPackets_NoLock(&q2); + } while (!q2.IsEmpty_NoLock()) { packet = q2.DequeuePacket_NoLock(&context); diff --git a/fnet/src/tests/locking/lockspeed.cpp b/fnet/src/tests/locking/lockspeed.cpp index 9932f6aa316..b85777f264c 100644 --- a/fnet/src/tests/locking/lockspeed.cpp +++ b/fnet/src/tests/locking/lockspeed.cpp @@ -7,7 +7,7 @@ TEST("lock speed") { FastOS_Time start; FastOS_Time stop; DummyLock dummy; - FastOS_Mutex lock; + std::mutex lock; double dummyTime; double actualTime; double overhead; @@ -46,26 +46,26 @@ TEST("lock speed") { start.SetNow(); for (i = 0; i < 1000000; i++) { - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); - lock.Lock(); - lock.Unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); + lock.lock(); + lock.unlock(); } stop.SetNow(); stop -= start; @@ -85,16 +85,16 @@ TEST("lock speed") { start.SetNow(); for (i = 0; i < 1000000; i++) { - FastOS_Mutex lock0; - FastOS_Mutex lock1; - FastOS_Mutex lock2; - FastOS_Mutex lock3; - FastOS_Mutex lock4; - FastOS_Mutex lock5; - FastOS_Mutex lock6; - FastOS_Mutex lock7; - FastOS_Mutex lock8; - FastOS_Mutex lock9; + std::mutex lock0; + std::mutex lock1; + std::mutex lock2; + std::mutex lock3; + std::mutex lock4; + std::mutex lock5; + std::mutex lock6; + std::mutex lock7; + std::mutex lock8; + std::mutex lock9; } stop.SetNow(); stop -= start; @@ -105,16 +105,16 @@ TEST("lock speed") { start.SetNow(); for (i = 0; i < 1000000; i++) { - FastOS_Cond cond0; - FastOS_Cond cond1; - FastOS_Cond cond2; - FastOS_Cond cond3; - FastOS_Cond cond4; - FastOS_Cond cond5; - FastOS_Cond cond6; - FastOS_Cond cond7; - FastOS_Cond cond8; - FastOS_Cond cond9; + std::condition_variable cond0; + std::condition_variable cond1; + std::condition_variable cond2; + std::condition_variable cond3; + std::condition_variable cond4; + std::condition_variable cond5; + std::condition_variable cond6; + std::condition_variable cond7; + std::condition_variable cond8; + std::condition_variable cond9; } stop.SetNow(); stop -= start; diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index 180e7ea9adf..f2a4ab3a23d 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -16,25 +16,26 @@ LOG_SETUP(".fnet"); namespace { class SyncPacket : public FNET_DummyPacket { private: - FastOS_Cond _cond; + std::mutex _lock; + std::condition_variable _cond; bool _done; bool _waiting; public: SyncPacket() - : _cond(), + : _lock(), + _cond(), _done(false), _waiting(false) {} ~SyncPacket() {} void WaitFree() { - _cond.Lock(); + std::unique_lock<std::mutex> guard(_lock); _waiting = true; while (!_done) - _cond.Wait(); + _cond.wait(guard); _waiting = false; - _cond.Unlock(); } void Free() override; @@ -44,11 +45,11 @@ public: void SyncPacket::Free() { - _cond.Lock(); + std::unique_lock<std::mutex> guard(_lock); _done = true; - if (_waiting) - _cond.Signal(); - _cond.Unlock(); + if (_waiting) { + _cond.notify_one(); + } } } @@ -100,7 +101,7 @@ FNET_Connection::SetState(State state) State oldstate; std::vector<FNET_Channel::UP> toDelete; - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); oldstate = _state; _state = state; if (LOG_WOULD_LOG(debug) && state != oldstate) { @@ -112,22 +113,22 @@ FNET_Connection::SetState(State state) if (_flags._writeLock) { _flags._discarding = true; while (_flags._writeLock) - Wait(); + _ioc_cond.wait(guard); _flags._discarding = false; } while (!_queue.IsEmpty_NoLock() || !_myQueue.IsEmpty_NoLock()) { _flags._discarding = true; _queue.FlushPackets_NoLock(&_myQueue); - Unlock(); + guard.unlock(); _myQueue.DiscardPackets_NoLock(); - Lock(); + guard.lock(); _flags._discarding = false; } - BeforeCallback(nullptr); + BeforeCallback(guard, nullptr); toDelete = _channels.Broadcast(&FNET_ControlPacket::ChannelLost); - AfterCallback(); + AfterCallback(guard); } if ( ! toDelete.empty() ) { @@ -141,7 +142,6 @@ FNET_Connection::SetState(State state) } } } - Unlock(); } @@ -153,7 +153,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, FNET_Channel *channel; FNET_IPacketHandler::HP_RetCode hp_rc; - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); channel = _channels.Lookup(chid); if (channel != nullptr) { // deliver packet on open channel @@ -161,12 +161,12 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, __builtin_prefetch(&_streamer); __builtin_prefetch(&_input); - BeforeCallback(channel); + BeforeCallback(guard, channel); __builtin_prefetch(channel->GetHandler(), 0); // Prefetch the handler while packet is being decoded. packet = _streamer->Decode(&_input, plen, pcode, channel->GetContext()); hp_rc = (packet != nullptr) ? channel->Receive(packet) : channel->Receive(&FNET_ControlPacket::BadPacket); - AfterCallback(); + AfterCallback(guard); FNET_Channel::UP toDelete; if (hp_rc > FNET_IPacketHandler::FNET_KEEP_CHANNEL) { @@ -181,21 +181,18 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, toDelete.reset(channel); } } - - Unlock(); - } else if (CanAcceptChannels() && IsFromPeer(chid)) { // open new channel FNET_Channel::UP newChannel(new FNET_Channel(chid, this)); channel = newChannel.get(); AddRef_NoLock(); - BeforeCallback(channel); + BeforeCallback(guard, channel); if (_serverAdapter->InitChannel(channel, pcode)) { packet = _streamer->Decode(&_input, plen, pcode, channel->GetContext()); hp_rc = (packet != nullptr) ? channel->Receive(packet) : channel->Receive(&FNET_ControlPacket::BadPacket); - AfterCallback(); + AfterCallback(guard); if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) { SubRef_NoLock(); @@ -204,14 +201,11 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, } else { newChannel.release(); // It has already been taken care of, so we should not free it here. } - - Unlock(); - } else { - AfterCallback(); + AfterCallback(guard); SubRef_NoLock(); - Unlock(); + guard.unlock(); LOG(debug, "Connection(%s): channel init failed", GetSpec()); _input.DataToDead(plen); @@ -219,7 +213,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode, } else { // skip unhandled packet - Unlock(); + guard.unlock(); LOG(spam, "Connection(%s): skipping unhandled packet", GetSpec()); _input.DataToDead(plen); } @@ -362,13 +356,14 @@ FNET_Connection::Write(bool direct) } } - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); _writeWork = _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock() + ((_output.GetDataLen() > 0) ? 1 : 0); _flags._writeLock = false; - if (_flags._discarding) - Broadcast(); + if (_flags._discarding) { + _ioc_cond.notify_all(); + } bool writePending = (_writeWork > 0); if (direct) { // direct write (from post packet) @@ -378,17 +373,15 @@ FNET_Connection::Write(bool direct) } if (writePending) { AddRef_NoLock(); - Unlock(); + guard.unlock(); if (broken) { Owner()->Close(this, /* needRef = */ false); } else { Owner()->EnableWrite(this, /* needRef = */ false); } - } else { - Unlock(); } } else { // normal write (from event loop) - Unlock(); + guard.unlock(); if (writtenData > 0) { CountDataWrite(writtenData); CountPacketWrite(writtenPackets); @@ -546,18 +539,17 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler, FNET_Channel::UP newChannel(new FNET_Channel(FNET_NOID, this, handler, context)); FNET_Channel * ret = nullptr; - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); if (__builtin_expect(_state < FNET_CLOSING, true)) { newChannel->SetID(GetNextID()); if (chid != nullptr) { *chid = newChannel->GetID(); } - WaitCallback(nullptr); + WaitCallback(guard, nullptr); AddRef_NoLock(); ret = newChannel.release(); _channels.Register(ret); } - Unlock(); return ret; } @@ -566,10 +558,12 @@ FNET_Channel* FNET_Connection::OpenChannel() { - Lock(); - uint32_t chid = GetNextID(); - AddRef_NoLock(); - Unlock(); + uint32_t chid; + { + std::unique_lock<std::mutex> guard(_ioc_lock); + chid = GetNextID(); + AddRef_NoLock(); + } return new FNET_Channel(chid, this); } @@ -577,12 +571,9 @@ FNET_Connection::OpenChannel() bool FNET_Connection::CloseChannel(FNET_Channel *channel) { - bool ret; - Lock(); - WaitCallback(channel); - ret = _channels.Unregister(channel); - Unlock(); - return ret; + std::unique_lock<std::mutex> guard(_ioc_lock); + WaitCallback(guard, channel); + return _channels.Unregister(channel); } @@ -590,18 +581,17 @@ void FNET_Connection::FreeChannel(FNET_Channel *channel) { delete channel; - Lock(); - SubRef_HasLock(); + SubRef_HasLock(std::unique_lock<std::mutex>(_ioc_lock)); } void FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel) { - Lock(); - WaitCallback(channel); + std::unique_lock<std::mutex> guard(_ioc_lock); + WaitCallback(guard, channel); _channels.Unregister(channel); - SubRef_HasLock(); + SubRef_HasLock(std::move(guard)); delete channel; } @@ -609,17 +599,16 @@ FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel) void FNET_Connection::CloseAdminChannel() { - Lock(); FNET_Channel::UP toDelete; + std::unique_lock<std::mutex> guard(_ioc_lock); if (_adminChannel != nullptr) { - WaitCallback(_adminChannel); + WaitCallback(guard, _adminChannel); if (_adminChannel != nullptr) { _channels.Unregister(_adminChannel); toDelete.reset(_adminChannel); _adminChannel = nullptr; } } - Unlock(); } @@ -629,13 +618,12 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) uint32_t writeWork; assert(packet != nullptr); - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); if (_state >= FNET_CLOSING) { if (_flags._discarding) { _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); - Unlock(); } else { - Unlock(); + guard.unlock(); packet->Free(); // discard packet } return false; // connection is down @@ -649,15 +637,13 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) if (GetConfig()->_directWrite) { _flags._writeLock = true; _queue.FlushPackets_NoLock(&_myQueue); - Unlock(); + guard.unlock(); Write(true); } else { AddRef_NoLock(); - Unlock(); + guard.unlock(); Owner()->EnableWrite(this, /* needRef = */ false); } - } else { - Unlock(); } return true; } @@ -666,11 +652,8 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) uint32_t FNET_Connection::GetQueueLen() { - Lock(); - uint32_t ret = _queue.GetPacketCnt_NoLock() - + _myQueue.GetPacketCnt_NoLock(); - Unlock(); - return ret; + std::unique_lock<std::mutex> guard(_ioc_lock); + return _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock(); } @@ -725,6 +708,16 @@ FNET_Connection::HandleReadEvent() bool +FNET_Connection::writePendingAfterConnect() +{ + std::unique_lock<std::mutex> guard(_ioc_lock); + _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) + LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), + GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); + return (_writeWork > 0); +} + +bool FNET_Connection::HandleWriteEvent() { int error; // socket error code @@ -734,14 +727,9 @@ FNET_Connection::HandleWriteEvent() case FNET_CONNECTING: error = _socket.get_so_error(); if (error == 0) { // connect ok - Lock(); - _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) - LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), - GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); - bool writePending = (_writeWork > 0); - Unlock(); - if (!writePending) + if (!writePendingAfterConnect()) { EnableWriteEvent(false); + } } else { LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error); @@ -750,15 +738,16 @@ FNET_Connection::HandleWriteEvent() } break; case FNET_CONNECTED: - Lock(); - if (_flags._writeLock) { - Unlock(); - EnableWriteEvent(false); - return true; + { + std::unique_lock<std::mutex> guard(_ioc_lock); + if (_flags._writeLock) { + guard.unlock(); + EnableWriteEvent(false); + return true; + } + _flags._writeLock = true; + _queue.FlushPackets_NoLock(&_myQueue); } - _flags._writeLock = true; - _queue.FlushPackets_NoLock(&_myQueue); - Unlock(); broken = !Write(false); break; case FNET_CLOSING: diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index 699516554e2..ef883d84989 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -138,12 +138,12 @@ private: * control packets all channels are callback * targets at the same time. **/ - void WaitCallback(FNET_Channel *channel) + void WaitCallback(std::unique_lock<std::mutex> &guard, FNET_Channel *channel) { while (_flags._inCallback && (_callbackTarget == channel || _callbackTarget == nullptr)) { _flags._callbackWait = true; - Wait(); + _ioc_cond.wait(guard); } } @@ -153,11 +153,11 @@ private: * called. When this method returns the object will be in callback * mode, but not locked. **/ - void BeforeCallback(FNET_Channel *channel) + void BeforeCallback(std::unique_lock<std::mutex> &guard, FNET_Channel *channel) { _flags._inCallback = true; _callbackTarget = channel; - Unlock(); + guard.unlock(); } /** @@ -166,13 +166,13 @@ private: * called, but not locked. When this method returns the object is no * longer in callback mode, but locked. **/ - void AfterCallback() + void AfterCallback(std::unique_lock<std::mutex> &guard) { - Lock(); + guard.lock(); _flags._inCallback = false; if (_flags._callbackWait) { _flags._callbackWait = false; - Broadcast(); + _ioc_cond.notify_all(); } } @@ -215,6 +215,7 @@ private: **/ bool Write(bool direct); + bool writePendingAfterConnect(); public: /** diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp index 0cb364935df..5afc355c68f 100644 --- a/fnet/src/vespa/fnet/frt/invoker.cpp +++ b/fnet/src/vespa/fnet/frt/invoker.cpp @@ -8,7 +8,8 @@ LOG_SETUP(".fnet.frt.invoker"); FRT_SingleReqWait::FRT_SingleReqWait() - : _cond(), + : _lock(), + _cond(), _done(false), _waiting(false) { } @@ -18,12 +19,12 @@ FRT_SingleReqWait::~FRT_SingleReqWait() {} void FRT_SingleReqWait::WaitReq() { - _cond.Lock(); + std::unique_lock<std::mutex> guard(_lock); _waiting = true; - while(!_done) - _cond.Wait(); + while(!_done) { + _cond.wait(guard); + } _waiting = false; - _cond.Unlock(); } @@ -31,11 +32,11 @@ void FRT_SingleReqWait::RequestDone(FRT_RPCRequest *req) { (void) req; - _cond.Lock(); + std::unique_lock<std::mutex> guard(_lock); _done = true; - if (_waiting) - _cond.Signal(); - _cond.Unlock(); + if (_waiting) { + _cond.notify_one(); + } } diff --git a/fnet/src/vespa/fnet/frt/invoker.h b/fnet/src/vespa/fnet/frt/invoker.h index 7716430f842..15d74017200 100644 --- a/fnet/src/vespa/fnet/frt/invoker.h +++ b/fnet/src/vespa/fnet/frt/invoker.h @@ -5,8 +5,9 @@ #include "rpcrequest.h" #include <vespa/fnet/task.h> #include <vespa/fnet/ipackethandler.h> -#include <vespa/fastos/cond.h> #include <vespa/fastos/thread.h> +#include <mutex> +#include <condition_variable> class FRT_Method; class FRT_Supervisor; @@ -29,7 +30,8 @@ public: class FRT_SingleReqWait : public FRT_IRequestWait { private: - FastOS_Cond _cond; + std::mutex _lock; + std::condition_variable _cond; bool _done; bool _waiting; diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index e5544ceff0e..ec51c1f080e 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -18,6 +18,7 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner, _ioc_spec(nullptr), _flags(shouldTimeOut), _ioc_timestamp(fastos::ClockSystem::now()), + _ioc_lock(), _ioc_cond(), _ioc_refcnt(1), _ioc_directPacketWriteCnt(0), @@ -47,10 +48,9 @@ FNET_IOComponent::UpdateTimeOut() { void FNET_IOComponent::AddRef() { - Lock(); + std::unique_lock<std::mutex> guard(_ioc_lock); assert(_ioc_refcnt > 0); _ioc_refcnt++; - Unlock(); } @@ -65,27 +65,26 @@ FNET_IOComponent::AddRef_NoLock() void FNET_IOComponent::SubRef() { - Lock(); - assert(_ioc_refcnt > 0); - if (--_ioc_refcnt > 0) { - Unlock(); - return; + { + std::unique_lock<std::mutex> guard(_ioc_lock); + assert(_ioc_refcnt > 0); + if (--_ioc_refcnt > 0) { + return; + } } - Unlock(); CleanupHook(); delete this; } void -FNET_IOComponent::SubRef_HasLock() +FNET_IOComponent::SubRef_HasLock(std::unique_lock<std::mutex> guard) { assert(_ioc_refcnt > 0); if (--_ioc_refcnt > 0) { - Unlock(); return; } - Unlock(); + guard.unlock(); CleanupHook(); delete this; } diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h index d0d8cee85b7..a48930428c7 100644 --- a/fnet/src/vespa/fnet/iocomponent.h +++ b/fnet/src/vespa/fnet/iocomponent.h @@ -3,9 +3,10 @@ #pragma once #include "stats.h" -#include <vespa/fastos/cond.h> #include <vespa/fastos/timestamp.h> #include <vespa/vespalib/net/selector.h> +#include <mutex> +#include <condition_variable> class FNET_TransportThread; class FNET_StatCounters; @@ -50,7 +51,8 @@ protected: char *_ioc_spec; // connect/listen spec Flags _flags; // Compressed representation of boolean flags; fastos::TimeStamp _ioc_timestamp; // last I/O activity - FastOS_Cond _ioc_cond; // synchronization + std::mutex _ioc_lock; // synchronization + std::condition_variable _ioc_cond; // synchronization uint32_t _ioc_refcnt; // reference counter // direct write stats kept locally @@ -86,38 +88,10 @@ public: **/ const char *GetSpec() const { return _ioc_spec; } - - /** - * Lock object to gain exclusive access. - **/ - void Lock() { _ioc_cond.Lock(); } - - - /** - * Unlock object to yield exclusive access. - **/ - void Unlock() { _ioc_cond.Unlock(); } - - - /** - * Wait on this object. Caller should have lock on object. - **/ - void Wait() { _ioc_cond.Wait(); } - - - /** - * Signal one thread waiting on this object. Caller should have - * lock. - **/ - void Signal() { _ioc_cond.Signal(); } - - - /** - * Signal all thread waiting on this object. Caller should have - * lock. - **/ - void Broadcast() { _ioc_cond.Broadcast(); } - + /* + * Get a guard to gain exclusive access. + */ + std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_ioc_lock); } /** * Allocate a reference to this component. This method locks the @@ -145,7 +119,7 @@ public: * protect the reference counter, but assumes that the lock has * already been obtained when this method is called. **/ - void SubRef_HasLock(); + void SubRef_HasLock(std::unique_lock<std::mutex> guard); /** diff --git a/fnet/src/vespa/fnet/packetqueue.cpp b/fnet/src/vespa/fnet/packetqueue.cpp index b2ed08d4b04..4fdb5842a9d 100644 --- a/fnet/src/vespa/fnet/packetqueue.cpp +++ b/fnet/src/vespa/fnet/packetqueue.cpp @@ -4,6 +4,7 @@ #include "packet.h" #include <vespa/fastos/time.h> #include <cassert> +#include <chrono> void FNET_PacketQueue_NoLock::ExpandBuf(uint32_t needentries) @@ -166,6 +167,7 @@ FNET_PacketQueue_NoLock::Print(uint32_t indent) FNET_PacketQueue::FNET_PacketQueue(uint32_t len, HP_RetCode hpRetCode) : FNET_PacketQueue_NoLock(len, hpRetCode), + _lock(), _cond(), _waitCnt(0) { @@ -190,16 +192,16 @@ void FNET_PacketQueue::QueuePacket(FNET_Packet *packet, FNET_Context context) { assert(packet != nullptr); - Lock(); + std::unique_lock<std::mutex> guard(_lock); EnsureFree(); _buf[_in_pos]._packet = packet; // insert packet ref. _buf[_in_pos]._context = context; if (++_in_pos == _bufsize) _in_pos = 0; // wrap around. _bufused++; - if (_waitCnt >= _bufused) // signal waiting thread(s) - Signal(); - Unlock(); + if (_waitCnt >= _bufused) { // signal waiting thread(s) + _cond.notify_one(); + } } @@ -207,17 +209,17 @@ FNET_Packet* FNET_PacketQueue::DequeuePacket(FNET_Context *context) { FNET_Packet *packet = nullptr; - Lock(); + std::unique_lock<std::mutex> guard(_lock); _waitCnt++; - while (_bufused == 0) - Wait(); + while (_bufused == 0) { + _cond.wait(guard); + } _waitCnt--; packet = _buf[_out_pos]._packet; *context = _buf[_out_pos]._context; if (++_out_pos == _bufsize) _out_pos = 0; // wrap around _bufused--; - Unlock(); return packet; } @@ -231,14 +233,14 @@ FNET_PacketQueue::DequeuePacket(uint32_t maxwait, FNET_Context *context) if (maxwait > 0) startTime.SetNow(); - Lock(); + std::unique_lock<std::mutex> guard(_lock); if (maxwait > 0) { bool timeout = false; _waitCnt++; - while ((_bufused == 0) && !timeout - && (waitTime = (int)(maxwait - startTime.MilliSecsToNow())) > 0) - timeout = !TimedWait(waitTime); + while ((_bufused == 0) && !timeout && (waitTime = (int)(maxwait - startTime.MilliSecsToNow())) > 0) { + timeout = _cond.wait_for(guard, std::chrono::milliseconds(waitTime)) == std::cv_status::timeout; + } _waitCnt--; } if (_bufused > 0) { @@ -248,7 +250,6 @@ FNET_PacketQueue::DequeuePacket(uint32_t maxwait, FNET_Context *context) _out_pos = 0; // wrap around _bufused--; } - Unlock(); return packet; } @@ -256,7 +257,7 @@ FNET_PacketQueue::DequeuePacket(uint32_t maxwait, FNET_Context *context) void FNET_PacketQueue::Print(uint32_t indent) { - Lock(); + std::unique_lock<std::mutex> guard(_lock); uint32_t i = _out_pos; uint32_t cnt = _bufused; @@ -273,5 +274,4 @@ FNET_PacketQueue::Print(uint32_t indent) _buf[i]._context.Print(indent + 2); } printf("%*s}\n", indent, ""); - Unlock(); } diff --git a/fnet/src/vespa/fnet/packetqueue.h b/fnet/src/vespa/fnet/packetqueue.h index 099410ed385..1daca901120 100644 --- a/fnet/src/vespa/fnet/packetqueue.h +++ b/fnet/src/vespa/fnet/packetqueue.h @@ -3,7 +3,8 @@ #pragma once #include "ipackethandler.h" -#include <vespa/fastos/cond.h> +#include <mutex> +#include <condition_variable> /** * This class implements a queue of packets. Being in a queue does not @@ -174,8 +175,9 @@ private: protected: - FastOS_Cond _cond; - uint32_t _waitCnt; + std::mutex _lock; + std::condition_variable _cond; + uint32_t _waitCnt; public: @@ -190,41 +192,6 @@ public: FNET_PacketQueue(uint32_t len = 64, HP_RetCode hpRetCode = FNET_KEEP_CHANNEL); ~FNET_PacketQueue(); - - /** - * Lock this queue to gain exclusive access. - **/ - void Lock() { _cond.Lock(); } - - - /** - * Unlock this queue to yield exclusive access. - **/ - void Unlock() { _cond.Unlock(); } - - - /** - * Wait for a signal on this queue. - **/ - void Wait() { _cond.Wait(); } - - - /** - * Wait for a signal on this queue, but time out after ms - * milliseconds. - * - * @return true(got signal)/false(timeout). - * @param ms number of milliseconds to wait before timing out. - **/ - bool TimedWait(int ms) { return _cond.TimedWait(ms); } - - - /** - * Signal one thread waiting on this queue. - **/ - void Signal() { _cond.Signal(); } - - /** * Handle incoming packet by putting it on the queue. This method * uses the hpRetCode value given to the constructor to decide what diff --git a/fnet/src/vespa/fnet/scheduler.cpp b/fnet/src/vespa/fnet/scheduler.cpp index 6e0fc3b8255..ef67407cb44 100644 --- a/fnet/src/vespa/fnet/scheduler.cpp +++ b/fnet/src/vespa/fnet/scheduler.cpp @@ -39,23 +39,24 @@ FNET_Scheduler::~FNET_Scheduler() if (LOG_WOULD_LOG(debug)) { bool empty = true; std::stringstream dump; - Lock(); - dump << "FNET_Scheduler {" << std::endl; - dump << " [slot=" << _currSlot << "][iter=" << _currIter << "]" << std::endl; - for (int i = 0; i <= NUM_SLOTS; i++) { - FNET_Task *pt = _slots[i]; - if (pt != nullptr) { - empty = false; - FNET_Task *end = pt; - do { - dump << " FNET_Task { slot=" << pt->_task_slot; - dump << ", iter=" << pt->_task_iter << " }" << std::endl; - pt = pt->_task_next; - } while (pt != end); + { + std::unique_lock<std::mutex> guard(_lock); + dump << "FNET_Scheduler {" << std::endl; + dump << " [slot=" << _currSlot << "][iter=" << _currIter << "]" << std::endl; + for (int i = 0; i <= NUM_SLOTS; i++) { + FNET_Task *pt = _slots[i]; + if (pt != nullptr) { + empty = false; + FNET_Task *end = pt; + do { + dump << " FNET_Task { slot=" << pt->_task_slot; + dump << ", iter=" << pt->_task_iter << " }" << std::endl; + pt = pt->_task_next; + } while (pt != end); + } } + dump << "}" << std::endl; } - dump << "}" << std::endl; - Unlock(); if (!empty) { LOG(debug, "~FNET_Scheduler(): tasks still pending when deleted" "\n%s", dump.str().c_str()); @@ -69,7 +70,7 @@ FNET_Scheduler::Schedule(FNET_Task *task, double seconds) { uint32_t ticks = 1 + (uint32_t) (seconds * (1000 / SLOT_TICK) + 0.5); - Lock(); + std::unique_lock<std::mutex> guard(_lock); if (!task->_killed) { if (IsActive(task)) LinkOut(task); @@ -77,14 +78,13 @@ FNET_Scheduler::Schedule(FNET_Task *task, double seconds) task->_task_iter = _currIter + ((ticks + _currSlot) >> SLOTS_SHIFT); LinkIn(task); } - Unlock(); } void FNET_Scheduler::ScheduleNow(FNET_Task *task) { - Lock(); + std::unique_lock<std::mutex> guard(_lock); if (!task->_killed) { if (IsActive(task)) LinkOut(task); @@ -92,37 +92,34 @@ FNET_Scheduler::ScheduleNow(FNET_Task *task) task->_task_iter = 0; LinkIn(task); } - Unlock(); } void FNET_Scheduler::Unschedule(FNET_Task *task) { - Lock(); - WaitTask(task); + std::unique_lock<std::mutex> guard(_lock); + WaitTask(guard, task); if (IsActive(task)) LinkOut(task); - Unlock(); } void FNET_Scheduler::Kill(FNET_Task *task) { - Lock(); - WaitTask(task); + std::unique_lock<std::mutex> guard(_lock); + WaitTask(guard, task); if (IsActive(task)) LinkOut(task); task->_killed = true; - Unlock(); } void FNET_Scheduler::Print(FILE *dst) { - Lock(); + std::unique_lock<std::mutex> guard(_lock); fprintf(dst, "FNET_Scheduler {\n"); fprintf(dst, " [slot=%d][iter=%d]\n", _currSlot, _currIter); for (int i = 0; i <= NUM_SLOTS; i++) { @@ -137,7 +134,6 @@ FNET_Scheduler::Print(FILE *dst) } } fprintf(dst, "}\n"); - Unlock(); } @@ -155,11 +151,11 @@ FNET_Scheduler::CheckTasks() if (_slots[NUM_SLOTS] == nullptr && _now < _next) return; - Lock(); + std::unique_lock<std::mutex> guard(_lock); // perform urgent tasks - PerformTasks(NUM_SLOTS, 0); + PerformTasks(guard, NUM_SLOTS, 0); // handle bucket timeout(s) @@ -169,10 +165,9 @@ FNET_Scheduler::CheckTasks() _currSlot = 0; _currIter++; } - PerformTasks(_currSlot, _currIter); + PerformTasks(guard, _currSlot, _currIter); } } - Unlock(); } void @@ -237,40 +232,40 @@ FNET_Scheduler::LinkOut(FNET_Task *task) { } void -FNET_Scheduler::BeforeTask(FNET_Task *task) { +FNET_Scheduler::BeforeTask(std::unique_lock<std::mutex> &guard, FNET_Task *task) { _performing = task; - Unlock(); + guard.unlock(); } void -FNET_Scheduler::AfterTask() { - Lock(); +FNET_Scheduler::AfterTask(std::unique_lock<std::mutex> &guard) { + guard.lock(); _performing = nullptr; if (_waitTask) { _waitTask = false; - Broadcast(); + _cond.notify_all(); } } void -FNET_Scheduler::WaitTask(FNET_Task *task) { +FNET_Scheduler::WaitTask(std::unique_lock<std::mutex> &guard, FNET_Task *task) { while (IsPerforming(task)) { _waitTask = true; - Wait(); + _cond.wait(guard); } } void -FNET_Scheduler::PerformTasks(uint32_t slot, uint32_t iter) { +FNET_Scheduler::PerformTasks(std::unique_lock<std::mutex> &guard, uint32_t slot, uint32_t iter) { FirstTask(slot); for (FNET_Task *task; (task = GetTask()) != nullptr; ) { NextTask(); if (task->_task_iter == iter) { LinkOut(task); - BeforeTask(task); + BeforeTask(guard, task); task->PerformTask(); // PERFORM TASK - AfterTask(); + AfterTask(guard); } } } diff --git a/fnet/src/vespa/fnet/scheduler.h b/fnet/src/vespa/fnet/scheduler.h index 68238fdbedc..82f0de068c1 100644 --- a/fnet/src/vespa/fnet/scheduler.h +++ b/fnet/src/vespa/fnet/scheduler.h @@ -3,7 +3,8 @@ #pragma once #include <vespa/fastos/time.h> -#include <vespa/fastos/cond.h> +#include <mutex> +#include <condition_variable> class FNET_Task; @@ -27,7 +28,8 @@ public: }; private: - FastOS_Cond _cond; + std::mutex _lock; + std::condition_variable _cond; FNET_Task *_slots[NUM_SLOTS + 1]; FastOS_Time _next; FastOS_Time _now; @@ -42,11 +44,6 @@ private: FNET_Scheduler(const FNET_Scheduler &); FNET_Scheduler &operator=(const FNET_Scheduler &); - void Lock() { _cond.Lock(); } - void Unlock() { _cond.Unlock(); } - void Wait() { _cond.Wait(); } - void Broadcast() { _cond.Broadcast(); } - FNET_Task *GetTask() { return _currPt; } void FirstTask(uint32_t slot); @@ -56,10 +53,10 @@ private: void LinkIn(FNET_Task *task); void LinkOut(FNET_Task *task); bool IsPerforming(FNET_Task *task) { return task == _performing; } - void BeforeTask(FNET_Task *task); - void AfterTask(); - void WaitTask(FNET_Task *task); - void PerformTasks(uint32_t slot, uint32_t iter); + void BeforeTask(std::unique_lock<std::mutex> &guard, FNET_Task *task); + void AfterTask(std::unique_lock<std::mutex> &guard); + void WaitTask(std::unique_lock<std::mutex> &guard, FNET_Task *task); + void PerformTasks(std::unique_lock<std::mutex> &guard, uint32_t slot, uint32_t iter); bool IsActive(FNET_Task *task); public: diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 6610d217294..61fcc26f1c6 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -123,15 +123,16 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, FNET_Context context) { bool wasEmpty; - Lock(); - if (_shutdown) { - Unlock(); - DiscardEvent(cpacket, context); - return false; + { + std::unique_lock<std::mutex> guard(_lock); + if (_shutdown) { + guard.unlock(); + DiscardEvent(cpacket, context); + return false; + } + wasEmpty = _queue.IsEmpty_NoLock(); + _queue.QueuePacket_NoLock(cpacket, context); } - wasEmpty = _queue.IsEmpty_NoLock(); - _queue.QueuePacket_NoLock(cpacket, context); - Unlock(); if (wasEmpty) { _selector.wakeup(); } @@ -168,13 +169,13 @@ FNET_TransportThread::UpdateStats() for (FNET_IOComponent *comp = _componentsHead; comp != nullptr; comp = comp->_ioc_next) { - comp->Lock(); + auto guard(comp->getGuard()); comp->FlushDirectWriteStats(); - comp->Unlock(); } - Lock(); - _stats.Update(&_counters, ms / 1000.0); - Unlock(); + { + std::unique_lock<std::mutex> guard(_lock); + _stats.Update(&_counters, ms / 1000.0); + } _counters.Clear(); if (_config._logStats) @@ -221,6 +222,7 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) _selector(), _queue(), _myQueue(), + _lock(), _cond(), _started(false), _shutdown(false), @@ -235,9 +237,10 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) FNET_TransportThread::~FNET_TransportThread() { - Lock(); - _deleted = true; - Unlock(); + { + std::unique_lock<std::mutex> guard(_lock); + _deleted = true; + } if (_started && !_finished) { LOG(error, "Transport: delete called on active object!"); } @@ -375,12 +378,13 @@ void FNET_TransportThread::ShutDown(bool waitFinished) { bool wasEmpty = false; - Lock(); - if (!_shutdown) { - _shutdown = true; - wasEmpty = _queue.IsEmpty_NoLock(); + { + std::unique_lock<std::mutex> guard(_lock); + if (!_shutdown) { + _shutdown = true; + wasEmpty = _queue.IsEmpty_NoLock(); + } } - Unlock(); if (wasEmpty) { _selector.wakeup(); } @@ -396,11 +400,10 @@ FNET_TransportThread::WaitFinished() if (_finished) return; - Lock(); + std::unique_lock<std::mutex> guard(_lock); _waitFinished = true; while (!_finished) - Wait(); - Unlock(); + _cond.wait(guard); } @@ -409,13 +412,14 @@ FNET_TransportThread::InitEventLoop() { bool wasStarted; bool wasDeleted; - Lock(); - wasStarted = _started; - wasDeleted = _deleted; - if (!_started && !_deleted) { - _started = true; + { + std::unique_lock<std::mutex> guard(_lock); + wasStarted = _started; + wasDeleted = _deleted; + if (!_started && !_deleted) { + _started = true; + } } - Unlock(); if (wasStarted) { LOG(error, "Transport: InitEventLoop: object already active!"); return false; @@ -435,9 +439,10 @@ FNET_TransportThread::InitEventLoop() void FNET_TransportThread::handle_wakeup() { - Lock(); - CountEvent(_queue.FlushPackets_NoLock(&_myQueue)); - Unlock(); + { + std::unique_lock<std::mutex> guard(_lock); + CountEvent(_queue.FlushPackets_NoLock(&_myQueue)); + } FNET_Context context; FNET_Packet *packet = nullptr; @@ -584,9 +589,10 @@ FNET_TransportThread::EventLoopIteration() _statsTask.Kill(); // flush event queue - Lock(); - _queue.FlushPackets_NoLock(&_myQueue); - Unlock(); + { + std::unique_lock<std::mutex> guard(_lock); + _queue.FlushPackets_NoLock(&_myQueue); + } // discard remaining events FNET_Context context; @@ -616,11 +622,13 @@ FNET_TransportThread::EventLoopIteration() _queue.IsEmpty_NoLock() && _myQueue.IsEmpty_NoLock()); - Lock(); - _finished = true; - if (_waitFinished) - Broadcast(); - Unlock(); + { + std::unique_lock<std::mutex> guard(_lock); + _finished = true; + if (_waitFinished) { + _cond.notify_all(); + } + } LOG(spam, "Transport: event loop finished."); diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 6ec29968980..8af0642eebe 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -11,6 +11,8 @@ #include <vespa/fastos/time.h> #include <vespa/vespalib/net/socket_handle.h> #include <vespa/vespalib/net/selector.h> +#include <mutex> +#include <condition_variable> class FNET_Transport; class FNET_ControlPacket; @@ -63,7 +65,8 @@ private: Selector _selector; // I/O event generator FNET_PacketQueue_NoLock _queue; // outer event queue FNET_PacketQueue_NoLock _myQueue; // inner event queue - FastOS_Cond _cond; // used for synchronization + std::mutex _lock; // used for synchronization + std::condition_variable _cond; // used for synchronization bool _started; // event loop started ? bool _shutdown; // should stop event loop ? bool _finished; // event loop stopped ? @@ -76,30 +79,6 @@ private: /** - * Lock this object. - **/ - void Lock() { _cond.Lock(); } - - - /** - * Unlock this object. - **/ - void Unlock() { _cond.Unlock(); } - - - /** - * Wait on this object. - **/ - void Wait() { _cond.Wait(); } - - - /** - * Wake all waiting on this object. - **/ - void Broadcast() { _cond.Broadcast(); } - - - /** * Add an IOComponent to the list of components. This operation is * performed immidiately and without locking. This method should * only be called in the transport thread. |