summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2017-10-27 17:46:26 +0000
committerGitHub <noreply@github.com>2017-10-27 17:46:26 +0000
commit86730de554af723ffd5e4d57b7e3e4d54cb61c0f (patch)
tree5f1966330a044776f90c2199d9d62e7301f03755
parentebdeac00bfe7c6682d258728708887437c0067f3 (diff)
parent54f460f1842cf4a202ce0358470c631d018e0451 (diff)
Merge pull request #3926 from vespa-engine/toregge/use-standard-locking-in-fnet
Toregge/use standard locking in fnet
-rw-r--r--fnet/src/tests/frt/rpc/invoke.cpp49
-rw-r--r--fnet/src/tests/frt/rpc/session.cpp15
-rw-r--r--fnet/src/tests/info/info.cpp8
-rw-r--r--fnet/src/tests/locking/drainpackets.cpp43
-rw-r--r--fnet/src/tests/locking/lockspeed.cpp82
-rw-r--r--fnet/src/vespa/fnet/connection.cpp159
-rw-r--r--fnet/src/vespa/fnet/connection.h15
-rw-r--r--fnet/src/vespa/fnet/frt/invoker.cpp19
-rw-r--r--fnet/src/vespa/fnet/frt/invoker.h6
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp21
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h44
-rw-r--r--fnet/src/vespa/fnet/packetqueue.cpp30
-rw-r--r--fnet/src/vespa/fnet/packetqueue.h43
-rw-r--r--fnet/src/vespa/fnet/scheduler.cpp77
-rw-r--r--fnet/src/vespa/fnet/scheduler.h19
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp90
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h29
17 files changed, 330 insertions, 419 deletions
diff --git a/fnet/src/tests/frt/rpc/invoke.cpp b/fnet/src/tests/frt/rpc/invoke.cpp
index 8f4f9dfca90..f44a58dd8b3 100644
--- a/fnet/src/tests/frt/rpc/invoke.cpp
+++ b/fnet/src/tests/frt/rpc/invoke.cpp
@@ -1,10 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/fnet/frt/frt.h>
+#include <mutex>
+#include <condition_variable>
//-------------------------------------------------------------
-FastOS_Mutex _delayedReturnCntLock;
+std::mutex _delayedReturnCntLock;
uint32_t _delayedReturnCnt = 0;
uint32_t _phase_simple_cnt = 0;
@@ -20,10 +22,11 @@ uint32_t _phase_echo_cnt = 0;
struct LockedReqWait : public FRT_IRequestWait
{
- FastOS_Cond _cond; // cond used to signal req done
+ std::mutex _condLock; // cond used to signal req done
+ std::condition_variable _cond; // cond used to signal req done
bool _done; // flag indicating req done
- FastOS_Mutex _lockLock; // lock protecting virtual lock
+ std::mutex _lockLock; // lock protecting virtual lock
bool _lock; // virtual lock
bool _wasLocked; // was 'locked' when req done
@@ -31,38 +34,32 @@ struct LockedReqWait : public FRT_IRequestWait
~LockedReqWait() {}
void lock() {
- _lockLock.Lock();
+ std::lock_guard<std::mutex> guard(_lockLock);
_lock = true;
- _lockLock.Unlock();
}
void unlock() {
- _lockLock.Lock();
+ std::lock_guard<std::mutex> guard(_lockLock);
_lock = false;
- _lockLock.Unlock();
}
bool isLocked() {
- _lockLock.Lock();
- bool ret = _lock;
- _lockLock.Unlock();
- return ret;
+ std::lock_guard<std::mutex> guard(_lockLock);
+ return _lock;
}
void RequestDone(FRT_RPCRequest *) override {
_wasLocked = isLocked();
- _cond.Lock();
+ std::lock_guard<std::mutex> guard(_condLock);
_done = true;
- _cond.Signal();
- _cond.Unlock();
+ _cond.notify_one();
}
void waitReq() {
- _cond.Lock();
+ std::unique_lock<std::mutex> guard(_condLock);
while(!_done) {
- _cond.Wait();
+ _cond.wait(guard);
}
- _cond.Unlock();
}
};
@@ -81,18 +78,18 @@ public:
: FNET_Task(sched),
_req(req)
{
- _delayedReturnCntLock.Lock();
- _delayedReturnCnt++;
- _delayedReturnCntLock.Unlock();
+ {
+ std::lock_guard<std::mutex> guard(_delayedReturnCntLock);
+ _delayedReturnCnt++;
+ }
Schedule(delay);
}
void PerformTask() override
{
_req->Return();
- _delayedReturnCntLock.Lock();
+ std::lock_guard<std::mutex> guard(_delayedReturnCntLock);
_delayedReturnCnt--;
- _delayedReturnCntLock.Unlock();
}
};
@@ -547,9 +544,11 @@ State::WaitForDelayedReturnCount(uint32_t wantedCount, double timeout)
FastOS_Time timer;
timer.SetNow();
for (;;) {
- _delayedReturnCntLock.Lock();
- uint32_t delayedReturnCnt = _delayedReturnCnt;
- _delayedReturnCntLock.Unlock();
+ uint32_t delayedReturnCnt;
+ {
+ std::lock_guard<std::mutex> guard(_delayedReturnCntLock);
+ delayedReturnCnt = _delayedReturnCnt;
+ }
if (delayedReturnCnt == wantedCount) {
return true;
}
diff --git a/fnet/src/tests/frt/rpc/session.cpp b/fnet/src/tests/frt/rpc/session.cpp
index 0316129a8b5..2e920ac9f98 100644
--- a/fnet/src/tests/frt/rpc/session.cpp
+++ b/fnet/src/tests/frt/rpc/session.cpp
@@ -2,28 +2,27 @@
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/fnet/frt/frt.h>
+#include <mutex>
class Session
{
private:
- static FastOS_Mutex _lock;
+ static std::mutex _lock;
static int _cnt;
int _val;
public:
Session() : _val(0)
{
- _lock.Lock();
- ++_cnt;
- _lock.Unlock();
+ std::lock_guard<std::mutex> guard(_lock);
+ ++_cnt;
}
~Session()
{
- _lock.Lock();
- --_cnt;
- _lock.Unlock();
+ std::lock_guard<std::mutex> guard(_lock);
+ --_cnt;
}
void SetValue(int val) { _val = val; }
@@ -31,7 +30,7 @@ public:
static int GetCnt() { return _cnt; }
};
-FastOS_Mutex Session::_lock;
+std::mutex Session::_lock;
int Session::_cnt(0);
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index ea8de81cff5..cd0364cad6f 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -70,16 +70,16 @@ TEST("info") {
TEST("size of important objects")
{
- EXPECT_EQUAL(192u, sizeof(FNET_IOComponent));
+ EXPECT_EQUAL(184u, sizeof(FNET_IOComponent));
EXPECT_EQUAL(32u, sizeof(FNET_Channel));
EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock));
- EXPECT_EQUAL(496u, sizeof(FNET_Connection));
- EXPECT_EQUAL(96u, sizeof(FastOS_Cond));
+ EXPECT_EQUAL(488u, sizeof(FNET_Connection));
+ EXPECT_EQUAL(48u, sizeof(std::condition_variable));
EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer));
EXPECT_EQUAL(24u, sizeof(FastOS_Time));
EXPECT_EQUAL(8u, sizeof(FNET_Context));
EXPECT_EQUAL(8u, sizeof(fastos::TimeStamp));
- EXPECT_EQUAL(48u, sizeof(FastOS_Mutex));
+ EXPECT_EQUAL(40u, sizeof(std::mutex));
EXPECT_EQUAL(40u, sizeof(pthread_mutex_t));
EXPECT_EQUAL(48u, sizeof(pthread_cond_t));
EXPECT_EQUAL(40u, sizeof(std::mutex));
diff --git a/fnet/src/tests/locking/drainpackets.cpp b/fnet/src/tests/locking/drainpackets.cpp
index 05ba923376d..a2999f0bddc 100644
--- a/fnet/src/tests/locking/drainpackets.cpp
+++ b/fnet/src/tests/locking/drainpackets.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/fnet/fnet.h>
+#include <mutex>
class MyPacket : public FNET_Packet
{
@@ -16,7 +17,7 @@ TEST("drain packets") {
FastOS_Time start;
FastOS_Time stop;
- FastOS_Mutex lock;
+ std::mutex lock;
FNET_PacketQueue q1(512);
FNET_PacketQueue q2(512);
@@ -39,25 +40,23 @@ TEST("drain packets") {
FNET_Packet *packet;
FNET_Context context;
- lock.Lock();
-
- while (!q1.IsEmpty_NoLock()) {
- packet = q1.DequeuePacket_NoLock(&context);
- q3.QueuePacket_NoLock(packet, context);
+ {
+ std::lock_guard<std::mutex> guard(lock);
+ while (!q1.IsEmpty_NoLock()) {
+ packet = q1.DequeuePacket_NoLock(&context);
+ q3.QueuePacket_NoLock(packet, context);
+ }
}
- lock.Unlock();
-
//------------------------
- lock.Lock();
-
- while (!q3.IsEmpty_NoLock()) {
- packet = q3.DequeuePacket_NoLock(&context);
- q1.QueuePacket_NoLock(packet, context);
+ {
+ std::lock_guard<std::mutex> guard(lock);
+ while (!q3.IsEmpty_NoLock()) {
+ packet = q3.DequeuePacket_NoLock(&context);
+ q1.QueuePacket_NoLock(packet, context);
+ }
}
-
- lock.Unlock();
}
stop.SetNow();
@@ -74,9 +73,10 @@ TEST("drain packets") {
FNET_Packet *packet;
FNET_Context context;
- lock.Lock();
- q1.FlushPackets_NoLock(&q2);
- lock.Unlock();
+ {
+ std::lock_guard<std::mutex> guard(lock);
+ q1.FlushPackets_NoLock(&q2);
+ }
while (!q2.IsEmpty_NoLock()) {
packet = q2.DequeuePacket_NoLock(&context);
@@ -85,9 +85,10 @@ TEST("drain packets") {
//------------------------
- lock.Lock();
- q3.FlushPackets_NoLock(&q2);
- lock.Unlock();
+ {
+ std::lock_guard<std::mutex> guard(lock);
+ q3.FlushPackets_NoLock(&q2);
+ }
while (!q2.IsEmpty_NoLock()) {
packet = q2.DequeuePacket_NoLock(&context);
diff --git a/fnet/src/tests/locking/lockspeed.cpp b/fnet/src/tests/locking/lockspeed.cpp
index 9932f6aa316..b85777f264c 100644
--- a/fnet/src/tests/locking/lockspeed.cpp
+++ b/fnet/src/tests/locking/lockspeed.cpp
@@ -7,7 +7,7 @@ TEST("lock speed") {
FastOS_Time start;
FastOS_Time stop;
DummyLock dummy;
- FastOS_Mutex lock;
+ std::mutex lock;
double dummyTime;
double actualTime;
double overhead;
@@ -46,26 +46,26 @@ TEST("lock speed") {
start.SetNow();
for (i = 0; i < 1000000; i++) {
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
- lock.Lock();
- lock.Unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
+ lock.lock();
+ lock.unlock();
}
stop.SetNow();
stop -= start;
@@ -85,16 +85,16 @@ TEST("lock speed") {
start.SetNow();
for (i = 0; i < 1000000; i++) {
- FastOS_Mutex lock0;
- FastOS_Mutex lock1;
- FastOS_Mutex lock2;
- FastOS_Mutex lock3;
- FastOS_Mutex lock4;
- FastOS_Mutex lock5;
- FastOS_Mutex lock6;
- FastOS_Mutex lock7;
- FastOS_Mutex lock8;
- FastOS_Mutex lock9;
+ std::mutex lock0;
+ std::mutex lock1;
+ std::mutex lock2;
+ std::mutex lock3;
+ std::mutex lock4;
+ std::mutex lock5;
+ std::mutex lock6;
+ std::mutex lock7;
+ std::mutex lock8;
+ std::mutex lock9;
}
stop.SetNow();
stop -= start;
@@ -105,16 +105,16 @@ TEST("lock speed") {
start.SetNow();
for (i = 0; i < 1000000; i++) {
- FastOS_Cond cond0;
- FastOS_Cond cond1;
- FastOS_Cond cond2;
- FastOS_Cond cond3;
- FastOS_Cond cond4;
- FastOS_Cond cond5;
- FastOS_Cond cond6;
- FastOS_Cond cond7;
- FastOS_Cond cond8;
- FastOS_Cond cond9;
+ std::condition_variable cond0;
+ std::condition_variable cond1;
+ std::condition_variable cond2;
+ std::condition_variable cond3;
+ std::condition_variable cond4;
+ std::condition_variable cond5;
+ std::condition_variable cond6;
+ std::condition_variable cond7;
+ std::condition_variable cond8;
+ std::condition_variable cond9;
}
stop.SetNow();
stop -= start;
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index 180e7ea9adf..f2a4ab3a23d 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -16,25 +16,26 @@ LOG_SETUP(".fnet");
namespace {
class SyncPacket : public FNET_DummyPacket {
private:
- FastOS_Cond _cond;
+ std::mutex _lock;
+ std::condition_variable _cond;
bool _done;
bool _waiting;
public:
SyncPacket()
- : _cond(),
+ : _lock(),
+ _cond(),
_done(false),
_waiting(false) {}
~SyncPacket() {}
void WaitFree() {
- _cond.Lock();
+ std::unique_lock<std::mutex> guard(_lock);
_waiting = true;
while (!_done)
- _cond.Wait();
+ _cond.wait(guard);
_waiting = false;
- _cond.Unlock();
}
void Free() override;
@@ -44,11 +45,11 @@ public:
void
SyncPacket::Free()
{
- _cond.Lock();
+ std::unique_lock<std::mutex> guard(_lock);
_done = true;
- if (_waiting)
- _cond.Signal();
- _cond.Unlock();
+ if (_waiting) {
+ _cond.notify_one();
+ }
}
}
@@ -100,7 +101,7 @@ FNET_Connection::SetState(State state)
State oldstate;
std::vector<FNET_Channel::UP> toDelete;
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
oldstate = _state;
_state = state;
if (LOG_WOULD_LOG(debug) && state != oldstate) {
@@ -112,22 +113,22 @@ FNET_Connection::SetState(State state)
if (_flags._writeLock) {
_flags._discarding = true;
while (_flags._writeLock)
- Wait();
+ _ioc_cond.wait(guard);
_flags._discarding = false;
}
while (!_queue.IsEmpty_NoLock() || !_myQueue.IsEmpty_NoLock()) {
_flags._discarding = true;
_queue.FlushPackets_NoLock(&_myQueue);
- Unlock();
+ guard.unlock();
_myQueue.DiscardPackets_NoLock();
- Lock();
+ guard.lock();
_flags._discarding = false;
}
- BeforeCallback(nullptr);
+ BeforeCallback(guard, nullptr);
toDelete = _channels.Broadcast(&FNET_ControlPacket::ChannelLost);
- AfterCallback();
+ AfterCallback(guard);
}
if ( ! toDelete.empty() ) {
@@ -141,7 +142,6 @@ FNET_Connection::SetState(State state)
}
}
}
- Unlock();
}
@@ -153,7 +153,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
FNET_Channel *channel;
FNET_IPacketHandler::HP_RetCode hp_rc;
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
channel = _channels.Lookup(chid);
if (channel != nullptr) { // deliver packet on open channel
@@ -161,12 +161,12 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
__builtin_prefetch(&_streamer);
__builtin_prefetch(&_input);
- BeforeCallback(channel);
+ BeforeCallback(guard, channel);
__builtin_prefetch(channel->GetHandler(), 0); // Prefetch the handler while packet is being decoded.
packet = _streamer->Decode(&_input, plen, pcode, channel->GetContext());
hp_rc = (packet != nullptr) ? channel->Receive(packet)
: channel->Receive(&FNET_ControlPacket::BadPacket);
- AfterCallback();
+ AfterCallback(guard);
FNET_Channel::UP toDelete;
if (hp_rc > FNET_IPacketHandler::FNET_KEEP_CHANNEL) {
@@ -181,21 +181,18 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
toDelete.reset(channel);
}
}
-
- Unlock();
-
} else if (CanAcceptChannels() && IsFromPeer(chid)) { // open new channel
FNET_Channel::UP newChannel(new FNET_Channel(chid, this));
channel = newChannel.get();
AddRef_NoLock();
- BeforeCallback(channel);
+ BeforeCallback(guard, channel);
if (_serverAdapter->InitChannel(channel, pcode)) {
packet = _streamer->Decode(&_input, plen, pcode, channel->GetContext());
hp_rc = (packet != nullptr) ? channel->Receive(packet)
: channel->Receive(&FNET_ControlPacket::BadPacket);
- AfterCallback();
+ AfterCallback(guard);
if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) {
SubRef_NoLock();
@@ -204,14 +201,11 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
} else {
newChannel.release(); // It has already been taken care of, so we should not free it here.
}
-
- Unlock();
-
} else {
- AfterCallback();
+ AfterCallback(guard);
SubRef_NoLock();
- Unlock();
+ guard.unlock();
LOG(debug, "Connection(%s): channel init failed", GetSpec());
_input.DataToDead(plen);
@@ -219,7 +213,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
} else { // skip unhandled packet
- Unlock();
+ guard.unlock();
LOG(spam, "Connection(%s): skipping unhandled packet", GetSpec());
_input.DataToDead(plen);
}
@@ -362,13 +356,14 @@ FNET_Connection::Write(bool direct)
}
}
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
_writeWork = _queue.GetPacketCnt_NoLock()
+ _myQueue.GetPacketCnt_NoLock()
+ ((_output.GetDataLen() > 0) ? 1 : 0);
_flags._writeLock = false;
- if (_flags._discarding)
- Broadcast();
+ if (_flags._discarding) {
+ _ioc_cond.notify_all();
+ }
bool writePending = (_writeWork > 0);
if (direct) { // direct write (from post packet)
@@ -378,17 +373,15 @@ FNET_Connection::Write(bool direct)
}
if (writePending) {
AddRef_NoLock();
- Unlock();
+ guard.unlock();
if (broken) {
Owner()->Close(this, /* needRef = */ false);
} else {
Owner()->EnableWrite(this, /* needRef = */ false);
}
- } else {
- Unlock();
}
} else { // normal write (from event loop)
- Unlock();
+ guard.unlock();
if (writtenData > 0) {
CountDataWrite(writtenData);
CountPacketWrite(writtenPackets);
@@ -546,18 +539,17 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler,
FNET_Channel::UP newChannel(new FNET_Channel(FNET_NOID, this, handler, context));
FNET_Channel * ret = nullptr;
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
if (__builtin_expect(_state < FNET_CLOSING, true)) {
newChannel->SetID(GetNextID());
if (chid != nullptr) {
*chid = newChannel->GetID();
}
- WaitCallback(nullptr);
+ WaitCallback(guard, nullptr);
AddRef_NoLock();
ret = newChannel.release();
_channels.Register(ret);
}
- Unlock();
return ret;
}
@@ -566,10 +558,12 @@ FNET_Channel*
FNET_Connection::OpenChannel()
{
- Lock();
- uint32_t chid = GetNextID();
- AddRef_NoLock();
- Unlock();
+ uint32_t chid;
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ chid = GetNextID();
+ AddRef_NoLock();
+ }
return new FNET_Channel(chid, this);
}
@@ -577,12 +571,9 @@ FNET_Connection::OpenChannel()
bool
FNET_Connection::CloseChannel(FNET_Channel *channel)
{
- bool ret;
- Lock();
- WaitCallback(channel);
- ret = _channels.Unregister(channel);
- Unlock();
- return ret;
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ WaitCallback(guard, channel);
+ return _channels.Unregister(channel);
}
@@ -590,18 +581,17 @@ void
FNET_Connection::FreeChannel(FNET_Channel *channel)
{
delete channel;
- Lock();
- SubRef_HasLock();
+ SubRef_HasLock(std::unique_lock<std::mutex>(_ioc_lock));
}
void
FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel)
{
- Lock();
- WaitCallback(channel);
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ WaitCallback(guard, channel);
_channels.Unregister(channel);
- SubRef_HasLock();
+ SubRef_HasLock(std::move(guard));
delete channel;
}
@@ -609,17 +599,16 @@ FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel)
void
FNET_Connection::CloseAdminChannel()
{
- Lock();
FNET_Channel::UP toDelete;
+ std::unique_lock<std::mutex> guard(_ioc_lock);
if (_adminChannel != nullptr) {
- WaitCallback(_adminChannel);
+ WaitCallback(guard, _adminChannel);
if (_adminChannel != nullptr) {
_channels.Unregister(_adminChannel);
toDelete.reset(_adminChannel);
_adminChannel = nullptr;
}
}
- Unlock();
}
@@ -629,13 +618,12 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
uint32_t writeWork;
assert(packet != nullptr);
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
if (_state >= FNET_CLOSING) {
if (_flags._discarding) {
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
- Unlock();
} else {
- Unlock();
+ guard.unlock();
packet->Free(); // discard packet
}
return false; // connection is down
@@ -649,15 +637,13 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
if (GetConfig()->_directWrite) {
_flags._writeLock = true;
_queue.FlushPackets_NoLock(&_myQueue);
- Unlock();
+ guard.unlock();
Write(true);
} else {
AddRef_NoLock();
- Unlock();
+ guard.unlock();
Owner()->EnableWrite(this, /* needRef = */ false);
}
- } else {
- Unlock();
}
return true;
}
@@ -666,11 +652,8 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
uint32_t
FNET_Connection::GetQueueLen()
{
- Lock();
- uint32_t ret = _queue.GetPacketCnt_NoLock()
- + _myQueue.GetPacketCnt_NoLock();
- Unlock();
- return ret;
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ return _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock();
}
@@ -725,6 +708,16 @@ FNET_Connection::HandleReadEvent()
bool
+FNET_Connection::writePendingAfterConnect()
+{
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ _state = FNET_CONNECTED; // SetState(FNET_CONNECTED)
+ LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
+ GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
+ return (_writeWork > 0);
+}
+
+bool
FNET_Connection::HandleWriteEvent()
{
int error; // socket error code
@@ -734,14 +727,9 @@ FNET_Connection::HandleWriteEvent()
case FNET_CONNECTING:
error = _socket.get_so_error();
if (error == 0) { // connect ok
- Lock();
- _state = FNET_CONNECTED; // SetState(FNET_CONNECTED)
- LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
- GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
- bool writePending = (_writeWork > 0);
- Unlock();
- if (!writePending)
+ if (!writePendingAfterConnect()) {
EnableWriteEvent(false);
+ }
} else {
LOG(debug, "Connection(%s): connect error: %d", GetSpec(), error);
@@ -750,15 +738,16 @@ FNET_Connection::HandleWriteEvent()
}
break;
case FNET_CONNECTED:
- Lock();
- if (_flags._writeLock) {
- Unlock();
- EnableWriteEvent(false);
- return true;
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ if (_flags._writeLock) {
+ guard.unlock();
+ EnableWriteEvent(false);
+ return true;
+ }
+ _flags._writeLock = true;
+ _queue.FlushPackets_NoLock(&_myQueue);
}
- _flags._writeLock = true;
- _queue.FlushPackets_NoLock(&_myQueue);
- Unlock();
broken = !Write(false);
break;
case FNET_CLOSING:
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 699516554e2..ef883d84989 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -138,12 +138,12 @@ private:
* control packets all channels are callback
* targets at the same time.
**/
- void WaitCallback(FNET_Channel *channel)
+ void WaitCallback(std::unique_lock<std::mutex> &guard, FNET_Channel *channel)
{
while (_flags._inCallback
&& (_callbackTarget == channel || _callbackTarget == nullptr)) {
_flags._callbackWait = true;
- Wait();
+ _ioc_cond.wait(guard);
}
}
@@ -153,11 +153,11 @@ private:
* called. When this method returns the object will be in callback
* mode, but not locked.
**/
- void BeforeCallback(FNET_Channel *channel)
+ void BeforeCallback(std::unique_lock<std::mutex> &guard, FNET_Channel *channel)
{
_flags._inCallback = true;
_callbackTarget = channel;
- Unlock();
+ guard.unlock();
}
/**
@@ -166,13 +166,13 @@ private:
* called, but not locked. When this method returns the object is no
* longer in callback mode, but locked.
**/
- void AfterCallback()
+ void AfterCallback(std::unique_lock<std::mutex> &guard)
{
- Lock();
+ guard.lock();
_flags._inCallback = false;
if (_flags._callbackWait) {
_flags._callbackWait = false;
- Broadcast();
+ _ioc_cond.notify_all();
}
}
@@ -215,6 +215,7 @@ private:
**/
bool Write(bool direct);
+ bool writePendingAfterConnect();
public:
/**
diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp
index 0cb364935df..5afc355c68f 100644
--- a/fnet/src/vespa/fnet/frt/invoker.cpp
+++ b/fnet/src/vespa/fnet/frt/invoker.cpp
@@ -8,7 +8,8 @@
LOG_SETUP(".fnet.frt.invoker");
FRT_SingleReqWait::FRT_SingleReqWait()
- : _cond(),
+ : _lock(),
+ _cond(),
_done(false),
_waiting(false)
{ }
@@ -18,12 +19,12 @@ FRT_SingleReqWait::~FRT_SingleReqWait() {}
void
FRT_SingleReqWait::WaitReq()
{
- _cond.Lock();
+ std::unique_lock<std::mutex> guard(_lock);
_waiting = true;
- while(!_done)
- _cond.Wait();
+ while(!_done) {
+ _cond.wait(guard);
+ }
_waiting = false;
- _cond.Unlock();
}
@@ -31,11 +32,11 @@ void
FRT_SingleReqWait::RequestDone(FRT_RPCRequest *req)
{
(void) req;
- _cond.Lock();
+ std::unique_lock<std::mutex> guard(_lock);
_done = true;
- if (_waiting)
- _cond.Signal();
- _cond.Unlock();
+ if (_waiting) {
+ _cond.notify_one();
+ }
}
diff --git a/fnet/src/vespa/fnet/frt/invoker.h b/fnet/src/vespa/fnet/frt/invoker.h
index 7716430f842..15d74017200 100644
--- a/fnet/src/vespa/fnet/frt/invoker.h
+++ b/fnet/src/vespa/fnet/frt/invoker.h
@@ -5,8 +5,9 @@
#include "rpcrequest.h"
#include <vespa/fnet/task.h>
#include <vespa/fnet/ipackethandler.h>
-#include <vespa/fastos/cond.h>
#include <vespa/fastos/thread.h>
+#include <mutex>
+#include <condition_variable>
class FRT_Method;
class FRT_Supervisor;
@@ -29,7 +30,8 @@ public:
class FRT_SingleReqWait : public FRT_IRequestWait
{
private:
- FastOS_Cond _cond;
+ std::mutex _lock;
+ std::condition_variable _cond;
bool _done;
bool _waiting;
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index e5544ceff0e..ec51c1f080e 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -18,6 +18,7 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner,
_ioc_spec(nullptr),
_flags(shouldTimeOut),
_ioc_timestamp(fastos::ClockSystem::now()),
+ _ioc_lock(),
_ioc_cond(),
_ioc_refcnt(1),
_ioc_directPacketWriteCnt(0),
@@ -47,10 +48,9 @@ FNET_IOComponent::UpdateTimeOut() {
void
FNET_IOComponent::AddRef()
{
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
assert(_ioc_refcnt > 0);
_ioc_refcnt++;
- Unlock();
}
@@ -65,27 +65,26 @@ FNET_IOComponent::AddRef_NoLock()
void
FNET_IOComponent::SubRef()
{
- Lock();
- assert(_ioc_refcnt > 0);
- if (--_ioc_refcnt > 0) {
- Unlock();
- return;
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ assert(_ioc_refcnt > 0);
+ if (--_ioc_refcnt > 0) {
+ return;
+ }
}
- Unlock();
CleanupHook();
delete this;
}
void
-FNET_IOComponent::SubRef_HasLock()
+FNET_IOComponent::SubRef_HasLock(std::unique_lock<std::mutex> guard)
{
assert(_ioc_refcnt > 0);
if (--_ioc_refcnt > 0) {
- Unlock();
return;
}
- Unlock();
+ guard.unlock();
CleanupHook();
delete this;
}
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index d0d8cee85b7..a48930428c7 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -3,9 +3,10 @@
#pragma once
#include "stats.h"
-#include <vespa/fastos/cond.h>
#include <vespa/fastos/timestamp.h>
#include <vespa/vespalib/net/selector.h>
+#include <mutex>
+#include <condition_variable>
class FNET_TransportThread;
class FNET_StatCounters;
@@ -50,7 +51,8 @@ protected:
char *_ioc_spec; // connect/listen spec
Flags _flags; // Compressed representation of boolean flags;
fastos::TimeStamp _ioc_timestamp; // last I/O activity
- FastOS_Cond _ioc_cond; // synchronization
+ std::mutex _ioc_lock; // synchronization
+ std::condition_variable _ioc_cond; // synchronization
uint32_t _ioc_refcnt; // reference counter
// direct write stats kept locally
@@ -86,38 +88,10 @@ public:
**/
const char *GetSpec() const { return _ioc_spec; }
-
- /**
- * Lock object to gain exclusive access.
- **/
- void Lock() { _ioc_cond.Lock(); }
-
-
- /**
- * Unlock object to yield exclusive access.
- **/
- void Unlock() { _ioc_cond.Unlock(); }
-
-
- /**
- * Wait on this object. Caller should have lock on object.
- **/
- void Wait() { _ioc_cond.Wait(); }
-
-
- /**
- * Signal one thread waiting on this object. Caller should have
- * lock.
- **/
- void Signal() { _ioc_cond.Signal(); }
-
-
- /**
- * Signal all thread waiting on this object. Caller should have
- * lock.
- **/
- void Broadcast() { _ioc_cond.Broadcast(); }
-
+ /*
+ * Get a guard to gain exclusive access.
+ */
+ std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_ioc_lock); }
/**
* Allocate a reference to this component. This method locks the
@@ -145,7 +119,7 @@ public:
* protect the reference counter, but assumes that the lock has
* already been obtained when this method is called.
**/
- void SubRef_HasLock();
+ void SubRef_HasLock(std::unique_lock<std::mutex> guard);
/**
diff --git a/fnet/src/vespa/fnet/packetqueue.cpp b/fnet/src/vespa/fnet/packetqueue.cpp
index b2ed08d4b04..4fdb5842a9d 100644
--- a/fnet/src/vespa/fnet/packetqueue.cpp
+++ b/fnet/src/vespa/fnet/packetqueue.cpp
@@ -4,6 +4,7 @@
#include "packet.h"
#include <vespa/fastos/time.h>
#include <cassert>
+#include <chrono>
void
FNET_PacketQueue_NoLock::ExpandBuf(uint32_t needentries)
@@ -166,6 +167,7 @@ FNET_PacketQueue_NoLock::Print(uint32_t indent)
FNET_PacketQueue::FNET_PacketQueue(uint32_t len,
HP_RetCode hpRetCode)
: FNET_PacketQueue_NoLock(len, hpRetCode),
+ _lock(),
_cond(),
_waitCnt(0)
{
@@ -190,16 +192,16 @@ void
FNET_PacketQueue::QueuePacket(FNET_Packet *packet, FNET_Context context)
{
assert(packet != nullptr);
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
EnsureFree();
_buf[_in_pos]._packet = packet; // insert packet ref.
_buf[_in_pos]._context = context;
if (++_in_pos == _bufsize)
_in_pos = 0; // wrap around.
_bufused++;
- if (_waitCnt >= _bufused) // signal waiting thread(s)
- Signal();
- Unlock();
+ if (_waitCnt >= _bufused) { // signal waiting thread(s)
+ _cond.notify_one();
+ }
}
@@ -207,17 +209,17 @@ FNET_Packet*
FNET_PacketQueue::DequeuePacket(FNET_Context *context)
{
FNET_Packet *packet = nullptr;
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
_waitCnt++;
- while (_bufused == 0)
- Wait();
+ while (_bufused == 0) {
+ _cond.wait(guard);
+ }
_waitCnt--;
packet = _buf[_out_pos]._packet;
*context = _buf[_out_pos]._context;
if (++_out_pos == _bufsize)
_out_pos = 0; // wrap around
_bufused--;
- Unlock();
return packet;
}
@@ -231,14 +233,14 @@ FNET_PacketQueue::DequeuePacket(uint32_t maxwait, FNET_Context *context)
if (maxwait > 0)
startTime.SetNow();
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
if (maxwait > 0) {
bool timeout = false;
_waitCnt++;
- while ((_bufused == 0) && !timeout
- && (waitTime = (int)(maxwait - startTime.MilliSecsToNow())) > 0)
- timeout = !TimedWait(waitTime);
+ while ((_bufused == 0) && !timeout && (waitTime = (int)(maxwait - startTime.MilliSecsToNow())) > 0) {
+ timeout = _cond.wait_for(guard, std::chrono::milliseconds(waitTime)) == std::cv_status::timeout;
+ }
_waitCnt--;
}
if (_bufused > 0) {
@@ -248,7 +250,6 @@ FNET_PacketQueue::DequeuePacket(uint32_t maxwait, FNET_Context *context)
_out_pos = 0; // wrap around
_bufused--;
}
- Unlock();
return packet;
}
@@ -256,7 +257,7 @@ FNET_PacketQueue::DequeuePacket(uint32_t maxwait, FNET_Context *context)
void
FNET_PacketQueue::Print(uint32_t indent)
{
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
uint32_t i = _out_pos;
uint32_t cnt = _bufused;
@@ -273,5 +274,4 @@ FNET_PacketQueue::Print(uint32_t indent)
_buf[i]._context.Print(indent + 2);
}
printf("%*s}\n", indent, "");
- Unlock();
}
diff --git a/fnet/src/vespa/fnet/packetqueue.h b/fnet/src/vespa/fnet/packetqueue.h
index 099410ed385..1daca901120 100644
--- a/fnet/src/vespa/fnet/packetqueue.h
+++ b/fnet/src/vespa/fnet/packetqueue.h
@@ -3,7 +3,8 @@
#pragma once
#include "ipackethandler.h"
-#include <vespa/fastos/cond.h>
+#include <mutex>
+#include <condition_variable>
/**
* This class implements a queue of packets. Being in a queue does not
@@ -174,8 +175,9 @@ private:
protected:
- FastOS_Cond _cond;
- uint32_t _waitCnt;
+ std::mutex _lock;
+ std::condition_variable _cond;
+ uint32_t _waitCnt;
public:
@@ -190,41 +192,6 @@ public:
FNET_PacketQueue(uint32_t len = 64, HP_RetCode hpRetCode = FNET_KEEP_CHANNEL);
~FNET_PacketQueue();
-
- /**
- * Lock this queue to gain exclusive access.
- **/
- void Lock() { _cond.Lock(); }
-
-
- /**
- * Unlock this queue to yield exclusive access.
- **/
- void Unlock() { _cond.Unlock(); }
-
-
- /**
- * Wait for a signal on this queue.
- **/
- void Wait() { _cond.Wait(); }
-
-
- /**
- * Wait for a signal on this queue, but time out after ms
- * milliseconds.
- *
- * @return true(got signal)/false(timeout).
- * @param ms number of milliseconds to wait before timing out.
- **/
- bool TimedWait(int ms) { return _cond.TimedWait(ms); }
-
-
- /**
- * Signal one thread waiting on this queue.
- **/
- void Signal() { _cond.Signal(); }
-
-
/**
* Handle incoming packet by putting it on the queue. This method
* uses the hpRetCode value given to the constructor to decide what
diff --git a/fnet/src/vespa/fnet/scheduler.cpp b/fnet/src/vespa/fnet/scheduler.cpp
index 6e0fc3b8255..ef67407cb44 100644
--- a/fnet/src/vespa/fnet/scheduler.cpp
+++ b/fnet/src/vespa/fnet/scheduler.cpp
@@ -39,23 +39,24 @@ FNET_Scheduler::~FNET_Scheduler()
if (LOG_WOULD_LOG(debug)) {
bool empty = true;
std::stringstream dump;
- Lock();
- dump << "FNET_Scheduler {" << std::endl;
- dump << " [slot=" << _currSlot << "][iter=" << _currIter << "]" << std::endl;
- for (int i = 0; i <= NUM_SLOTS; i++) {
- FNET_Task *pt = _slots[i];
- if (pt != nullptr) {
- empty = false;
- FNET_Task *end = pt;
- do {
- dump << " FNET_Task { slot=" << pt->_task_slot;
- dump << ", iter=" << pt->_task_iter << " }" << std::endl;
- pt = pt->_task_next;
- } while (pt != end);
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ dump << "FNET_Scheduler {" << std::endl;
+ dump << " [slot=" << _currSlot << "][iter=" << _currIter << "]" << std::endl;
+ for (int i = 0; i <= NUM_SLOTS; i++) {
+ FNET_Task *pt = _slots[i];
+ if (pt != nullptr) {
+ empty = false;
+ FNET_Task *end = pt;
+ do {
+ dump << " FNET_Task { slot=" << pt->_task_slot;
+ dump << ", iter=" << pt->_task_iter << " }" << std::endl;
+ pt = pt->_task_next;
+ } while (pt != end);
+ }
}
+ dump << "}" << std::endl;
}
- dump << "}" << std::endl;
- Unlock();
if (!empty) {
LOG(debug, "~FNET_Scheduler(): tasks still pending when deleted"
"\n%s", dump.str().c_str());
@@ -69,7 +70,7 @@ FNET_Scheduler::Schedule(FNET_Task *task, double seconds)
{
uint32_t ticks = 1 + (uint32_t) (seconds * (1000 / SLOT_TICK) + 0.5);
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
if (!task->_killed) {
if (IsActive(task))
LinkOut(task);
@@ -77,14 +78,13 @@ FNET_Scheduler::Schedule(FNET_Task *task, double seconds)
task->_task_iter = _currIter + ((ticks + _currSlot) >> SLOTS_SHIFT);
LinkIn(task);
}
- Unlock();
}
void
FNET_Scheduler::ScheduleNow(FNET_Task *task)
{
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
if (!task->_killed) {
if (IsActive(task))
LinkOut(task);
@@ -92,37 +92,34 @@ FNET_Scheduler::ScheduleNow(FNET_Task *task)
task->_task_iter = 0;
LinkIn(task);
}
- Unlock();
}
void
FNET_Scheduler::Unschedule(FNET_Task *task)
{
- Lock();
- WaitTask(task);
+ std::unique_lock<std::mutex> guard(_lock);
+ WaitTask(guard, task);
if (IsActive(task))
LinkOut(task);
- Unlock();
}
void
FNET_Scheduler::Kill(FNET_Task *task)
{
- Lock();
- WaitTask(task);
+ std::unique_lock<std::mutex> guard(_lock);
+ WaitTask(guard, task);
if (IsActive(task))
LinkOut(task);
task->_killed = true;
- Unlock();
}
void
FNET_Scheduler::Print(FILE *dst)
{
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
fprintf(dst, "FNET_Scheduler {\n");
fprintf(dst, " [slot=%d][iter=%d]\n", _currSlot, _currIter);
for (int i = 0; i <= NUM_SLOTS; i++) {
@@ -137,7 +134,6 @@ FNET_Scheduler::Print(FILE *dst)
}
}
fprintf(dst, "}\n");
- Unlock();
}
@@ -155,11 +151,11 @@ FNET_Scheduler::CheckTasks()
if (_slots[NUM_SLOTS] == nullptr && _now < _next)
return;
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
// perform urgent tasks
- PerformTasks(NUM_SLOTS, 0);
+ PerformTasks(guard, NUM_SLOTS, 0);
// handle bucket timeout(s)
@@ -169,10 +165,9 @@ FNET_Scheduler::CheckTasks()
_currSlot = 0;
_currIter++;
}
- PerformTasks(_currSlot, _currIter);
+ PerformTasks(guard, _currSlot, _currIter);
}
}
- Unlock();
}
void
@@ -237,40 +232,40 @@ FNET_Scheduler::LinkOut(FNET_Task *task) {
}
void
-FNET_Scheduler::BeforeTask(FNET_Task *task) {
+FNET_Scheduler::BeforeTask(std::unique_lock<std::mutex> &guard, FNET_Task *task) {
_performing = task;
- Unlock();
+ guard.unlock();
}
void
-FNET_Scheduler::AfterTask() {
- Lock();
+FNET_Scheduler::AfterTask(std::unique_lock<std::mutex> &guard) {
+ guard.lock();
_performing = nullptr;
if (_waitTask) {
_waitTask = false;
- Broadcast();
+ _cond.notify_all();
}
}
void
-FNET_Scheduler::WaitTask(FNET_Task *task) {
+FNET_Scheduler::WaitTask(std::unique_lock<std::mutex> &guard, FNET_Task *task) {
while (IsPerforming(task)) {
_waitTask = true;
- Wait();
+ _cond.wait(guard);
}
}
void
-FNET_Scheduler::PerformTasks(uint32_t slot, uint32_t iter) {
+FNET_Scheduler::PerformTasks(std::unique_lock<std::mutex> &guard, uint32_t slot, uint32_t iter) {
FirstTask(slot);
for (FNET_Task *task; (task = GetTask()) != nullptr; ) {
NextTask();
if (task->_task_iter == iter) {
LinkOut(task);
- BeforeTask(task);
+ BeforeTask(guard, task);
task->PerformTask(); // PERFORM TASK
- AfterTask();
+ AfterTask(guard);
}
}
}
diff --git a/fnet/src/vespa/fnet/scheduler.h b/fnet/src/vespa/fnet/scheduler.h
index 68238fdbedc..82f0de068c1 100644
--- a/fnet/src/vespa/fnet/scheduler.h
+++ b/fnet/src/vespa/fnet/scheduler.h
@@ -3,7 +3,8 @@
#pragma once
#include <vespa/fastos/time.h>
-#include <vespa/fastos/cond.h>
+#include <mutex>
+#include <condition_variable>
class FNET_Task;
@@ -27,7 +28,8 @@ public:
};
private:
- FastOS_Cond _cond;
+ std::mutex _lock;
+ std::condition_variable _cond;
FNET_Task *_slots[NUM_SLOTS + 1];
FastOS_Time _next;
FastOS_Time _now;
@@ -42,11 +44,6 @@ private:
FNET_Scheduler(const FNET_Scheduler &);
FNET_Scheduler &operator=(const FNET_Scheduler &);
- void Lock() { _cond.Lock(); }
- void Unlock() { _cond.Unlock(); }
- void Wait() { _cond.Wait(); }
- void Broadcast() { _cond.Broadcast(); }
-
FNET_Task *GetTask() { return _currPt; }
void FirstTask(uint32_t slot);
@@ -56,10 +53,10 @@ private:
void LinkIn(FNET_Task *task);
void LinkOut(FNET_Task *task);
bool IsPerforming(FNET_Task *task) { return task == _performing; }
- void BeforeTask(FNET_Task *task);
- void AfterTask();
- void WaitTask(FNET_Task *task);
- void PerformTasks(uint32_t slot, uint32_t iter);
+ void BeforeTask(std::unique_lock<std::mutex> &guard, FNET_Task *task);
+ void AfterTask(std::unique_lock<std::mutex> &guard);
+ void WaitTask(std::unique_lock<std::mutex> &guard, FNET_Task *task);
+ void PerformTasks(std::unique_lock<std::mutex> &guard, uint32_t slot, uint32_t iter);
bool IsActive(FNET_Task *task);
public:
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 6610d217294..61fcc26f1c6 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -123,15 +123,16 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
FNET_Context context)
{
bool wasEmpty;
- Lock();
- if (_shutdown) {
- Unlock();
- DiscardEvent(cpacket, context);
- return false;
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ if (_shutdown) {
+ guard.unlock();
+ DiscardEvent(cpacket, context);
+ return false;
+ }
+ wasEmpty = _queue.IsEmpty_NoLock();
+ _queue.QueuePacket_NoLock(cpacket, context);
}
- wasEmpty = _queue.IsEmpty_NoLock();
- _queue.QueuePacket_NoLock(cpacket, context);
- Unlock();
if (wasEmpty) {
_selector.wakeup();
}
@@ -168,13 +169,13 @@ FNET_TransportThread::UpdateStats()
for (FNET_IOComponent *comp = _componentsHead;
comp != nullptr; comp = comp->_ioc_next)
{
- comp->Lock();
+ auto guard(comp->getGuard());
comp->FlushDirectWriteStats();
- comp->Unlock();
}
- Lock();
- _stats.Update(&_counters, ms / 1000.0);
- Unlock();
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ _stats.Update(&_counters, ms / 1000.0);
+ }
_counters.Clear();
if (_config._logStats)
@@ -221,6 +222,7 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
_selector(),
_queue(),
_myQueue(),
+ _lock(),
_cond(),
_started(false),
_shutdown(false),
@@ -235,9 +237,10 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
FNET_TransportThread::~FNET_TransportThread()
{
- Lock();
- _deleted = true;
- Unlock();
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ _deleted = true;
+ }
if (_started && !_finished) {
LOG(error, "Transport: delete called on active object!");
}
@@ -375,12 +378,13 @@ void
FNET_TransportThread::ShutDown(bool waitFinished)
{
bool wasEmpty = false;
- Lock();
- if (!_shutdown) {
- _shutdown = true;
- wasEmpty = _queue.IsEmpty_NoLock();
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ if (!_shutdown) {
+ _shutdown = true;
+ wasEmpty = _queue.IsEmpty_NoLock();
+ }
}
- Unlock();
if (wasEmpty) {
_selector.wakeup();
}
@@ -396,11 +400,10 @@ FNET_TransportThread::WaitFinished()
if (_finished)
return;
- Lock();
+ std::unique_lock<std::mutex> guard(_lock);
_waitFinished = true;
while (!_finished)
- Wait();
- Unlock();
+ _cond.wait(guard);
}
@@ -409,13 +412,14 @@ FNET_TransportThread::InitEventLoop()
{
bool wasStarted;
bool wasDeleted;
- Lock();
- wasStarted = _started;
- wasDeleted = _deleted;
- if (!_started && !_deleted) {
- _started = true;
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ wasStarted = _started;
+ wasDeleted = _deleted;
+ if (!_started && !_deleted) {
+ _started = true;
+ }
}
- Unlock();
if (wasStarted) {
LOG(error, "Transport: InitEventLoop: object already active!");
return false;
@@ -435,9 +439,10 @@ FNET_TransportThread::InitEventLoop()
void
FNET_TransportThread::handle_wakeup()
{
- Lock();
- CountEvent(_queue.FlushPackets_NoLock(&_myQueue));
- Unlock();
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ CountEvent(_queue.FlushPackets_NoLock(&_myQueue));
+ }
FNET_Context context;
FNET_Packet *packet = nullptr;
@@ -584,9 +589,10 @@ FNET_TransportThread::EventLoopIteration()
_statsTask.Kill();
// flush event queue
- Lock();
- _queue.FlushPackets_NoLock(&_myQueue);
- Unlock();
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ _queue.FlushPackets_NoLock(&_myQueue);
+ }
// discard remaining events
FNET_Context context;
@@ -616,11 +622,13 @@ FNET_TransportThread::EventLoopIteration()
_queue.IsEmpty_NoLock() &&
_myQueue.IsEmpty_NoLock());
- Lock();
- _finished = true;
- if (_waitFinished)
- Broadcast();
- Unlock();
+ {
+ std::unique_lock<std::mutex> guard(_lock);
+ _finished = true;
+ if (_waitFinished) {
+ _cond.notify_all();
+ }
+ }
LOG(spam, "Transport: event loop finished.");
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 6ec29968980..8af0642eebe 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -11,6 +11,8 @@
#include <vespa/fastos/time.h>
#include <vespa/vespalib/net/socket_handle.h>
#include <vespa/vespalib/net/selector.h>
+#include <mutex>
+#include <condition_variable>
class FNET_Transport;
class FNET_ControlPacket;
@@ -63,7 +65,8 @@ private:
Selector _selector; // I/O event generator
FNET_PacketQueue_NoLock _queue; // outer event queue
FNET_PacketQueue_NoLock _myQueue; // inner event queue
- FastOS_Cond _cond; // used for synchronization
+ std::mutex _lock; // used for synchronization
+ std::condition_variable _cond; // used for synchronization
bool _started; // event loop started ?
bool _shutdown; // should stop event loop ?
bool _finished; // event loop stopped ?
@@ -76,30 +79,6 @@ private:
/**
- * Lock this object.
- **/
- void Lock() { _cond.Lock(); }
-
-
- /**
- * Unlock this object.
- **/
- void Unlock() { _cond.Unlock(); }
-
-
- /**
- * Wait on this object.
- **/
- void Wait() { _cond.Wait(); }
-
-
- /**
- * Wake all waiting on this object.
- **/
- void Broadcast() { _cond.Broadcast(); }
-
-
- /**
* Add an IOComponent to the list of components. This operation is
* performed immidiately and without locking. This method should
* only be called in the transport thread.