summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-10-27 12:00:08 +0000
committerTor Egge <Tor.Egge@oath.com>2017-10-27 12:00:08 +0000
commit0dd33f6fac0cdc3e0dc23d6a857f34663f093b84 (patch)
tree103e9be6cb7b9c103be37f0a508c89f1b6a89cc2
parent9d64263c147c862d7cd9017673d8208db751e781 (diff)
Use std::mutex and std::condition_variable instead of FastOS_Cond.
-rw-r--r--fnet/src/tests/info/info.cpp4
-rw-r--r--fnet/src/vespa/fnet/connection.cpp130
-rw-r--r--fnet/src/vespa/fnet/connection.h14
-rw-r--r--fnet/src/vespa/fnet/iocomponent.cpp21
-rw-r--r--fnet/src/vespa/fnet/iocomponent.h44
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp3
6 files changed, 90 insertions, 126 deletions
diff --git a/fnet/src/tests/info/info.cpp b/fnet/src/tests/info/info.cpp
index ea8de81cff5..f15c8f83d80 100644
--- a/fnet/src/tests/info/info.cpp
+++ b/fnet/src/tests/info/info.cpp
@@ -70,10 +70,10 @@ TEST("info") {
TEST("size of important objects")
{
- EXPECT_EQUAL(192u, sizeof(FNET_IOComponent));
+ EXPECT_EQUAL(184u, sizeof(FNET_IOComponent));
EXPECT_EQUAL(32u, sizeof(FNET_Channel));
EXPECT_EQUAL(40u, sizeof(FNET_PacketQueue_NoLock));
- EXPECT_EQUAL(496u, sizeof(FNET_Connection));
+ EXPECT_EQUAL(488u, sizeof(FNET_Connection));
EXPECT_EQUAL(96u, sizeof(FastOS_Cond));
EXPECT_EQUAL(56u, sizeof(FNET_DataBuffer));
EXPECT_EQUAL(24u, sizeof(FastOS_Time));
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index bdaeeefd1ae..9fd3f52cc09 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -101,7 +101,7 @@ FNET_Connection::SetState(State state)
State oldstate;
std::vector<FNET_Channel::UP> toDelete;
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
oldstate = _state;
_state = state;
if (LOG_WOULD_LOG(debug) && state != oldstate) {
@@ -113,22 +113,22 @@ FNET_Connection::SetState(State state)
if (_flags._writeLock) {
_flags._discarding = true;
while (_flags._writeLock)
- Wait();
+ _ioc_cond.wait(guard);
_flags._discarding = false;
}
while (!_queue.IsEmpty_NoLock() || !_myQueue.IsEmpty_NoLock()) {
_flags._discarding = true;
_queue.FlushPackets_NoLock(&_myQueue);
- Unlock();
+ guard.unlock();
_myQueue.DiscardPackets_NoLock();
- Lock();
+ guard.lock();
_flags._discarding = false;
}
- BeforeCallback(nullptr);
+ BeforeCallback(guard, nullptr);
toDelete = _channels.Broadcast(&FNET_ControlPacket::ChannelLost);
- AfterCallback();
+ AfterCallback(guard);
}
if ( ! toDelete.empty() ) {
@@ -142,7 +142,6 @@ FNET_Connection::SetState(State state)
}
}
}
- Unlock();
}
@@ -154,7 +153,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
FNET_Channel *channel;
FNET_IPacketHandler::HP_RetCode hp_rc;
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
channel = _channels.Lookup(chid);
if (channel != nullptr) { // deliver packet on open channel
@@ -162,12 +161,12 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
__builtin_prefetch(&_streamer);
__builtin_prefetch(&_input);
- BeforeCallback(channel);
+ BeforeCallback(guard, channel);
__builtin_prefetch(channel->GetHandler(), 0); // Prefetch the handler while packet is being decoded.
packet = _streamer->Decode(&_input, plen, pcode, channel->GetContext());
hp_rc = (packet != nullptr) ? channel->Receive(packet)
: channel->Receive(&FNET_ControlPacket::BadPacket);
- AfterCallback();
+ AfterCallback(guard);
FNET_Channel::UP toDelete;
if (hp_rc > FNET_IPacketHandler::FNET_KEEP_CHANNEL) {
@@ -182,21 +181,18 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
toDelete.reset(channel);
}
}
-
- Unlock();
-
} else if (CanAcceptChannels() && IsFromPeer(chid)) { // open new channel
FNET_Channel::UP newChannel(new FNET_Channel(chid, this));
channel = newChannel.get();
AddRef_NoLock();
- BeforeCallback(channel);
+ BeforeCallback(guard, channel);
if (_serverAdapter->InitChannel(channel, pcode)) {
packet = _streamer->Decode(&_input, plen, pcode, channel->GetContext());
hp_rc = (packet != nullptr) ? channel->Receive(packet)
: channel->Receive(&FNET_ControlPacket::BadPacket);
- AfterCallback();
+ AfterCallback(guard);
if (hp_rc == FNET_IPacketHandler::FNET_FREE_CHANNEL) {
SubRef_NoLock();
@@ -205,14 +201,11 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
} else {
newChannel.release(); // It has already been taken care of, so we should not free it here.
}
-
- Unlock();
-
} else {
- AfterCallback();
+ AfterCallback(guard);
SubRef_NoLock();
- Unlock();
+ guard.unlock();
LOG(debug, "Connection(%s): channel init failed", GetSpec());
_input.DataToDead(plen);
@@ -220,7 +213,7 @@ FNET_Connection::HandlePacket(uint32_t plen, uint32_t pcode,
} else { // skip unhandled packet
- Unlock();
+ guard.unlock();
LOG(spam, "Connection(%s): skipping unhandled packet", GetSpec());
_input.DataToDead(plen);
}
@@ -363,13 +356,14 @@ FNET_Connection::Write(bool direct)
}
}
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
_writeWork = _queue.GetPacketCnt_NoLock()
+ _myQueue.GetPacketCnt_NoLock()
+ ((_output.GetDataLen() > 0) ? 1 : 0);
_flags._writeLock = false;
- if (_flags._discarding)
- Broadcast();
+ if (_flags._discarding) {
+ _ioc_cond.notify_all();
+ }
bool writePending = (_writeWork > 0);
if (direct) { // direct write (from post packet)
@@ -379,17 +373,15 @@ FNET_Connection::Write(bool direct)
}
if (writePending) {
AddRef_NoLock();
- Unlock();
+ guard.unlock();
if (broken) {
Owner()->Close(this, /* needRef = */ false);
} else {
Owner()->EnableWrite(this, /* needRef = */ false);
}
- } else {
- Unlock();
}
} else { // normal write (from event loop)
- Unlock();
+ guard.unlock();
if (writtenData > 0) {
CountDataWrite(writtenData);
CountPacketWrite(writtenPackets);
@@ -547,18 +539,17 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler,
FNET_Channel::UP newChannel(new FNET_Channel(FNET_NOID, this, handler, context));
FNET_Channel * ret = nullptr;
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
if (__builtin_expect(_state < FNET_CLOSING, true)) {
newChannel->SetID(GetNextID());
if (chid != nullptr) {
*chid = newChannel->GetID();
}
- WaitCallback(nullptr);
+ WaitCallback(guard, nullptr);
AddRef_NoLock();
ret = newChannel.release();
_channels.Register(ret);
}
- Unlock();
return ret;
}
@@ -567,10 +558,12 @@ FNET_Channel*
FNET_Connection::OpenChannel()
{
- Lock();
- uint32_t chid = GetNextID();
- AddRef_NoLock();
- Unlock();
+ uint32_t chid;
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ chid = GetNextID();
+ AddRef_NoLock();
+ }
return new FNET_Channel(chid, this);
}
@@ -579,10 +572,9 @@ bool
FNET_Connection::CloseChannel(FNET_Channel *channel)
{
bool ret;
- Lock();
- WaitCallback(channel);
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ WaitCallback(guard, channel);
ret = _channels.Unregister(channel);
- Unlock();
return ret;
}
@@ -591,18 +583,18 @@ void
FNET_Connection::FreeChannel(FNET_Channel *channel)
{
delete channel;
- Lock();
- SubRef_HasLock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ SubRef_HasLock(std::move(guard));
}
void
FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel)
{
- Lock();
- WaitCallback(channel);
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ WaitCallback(guard, channel);
_channels.Unregister(channel);
- SubRef_HasLock();
+ SubRef_HasLock(std::move(guard));
delete channel;
}
@@ -610,17 +602,16 @@ FNET_Connection::CloseAndFreeChannel(FNET_Channel *channel)
void
FNET_Connection::CloseAdminChannel()
{
- Lock();
FNET_Channel::UP toDelete;
+ std::unique_lock<std::mutex> guard(_ioc_lock);
if (_adminChannel != nullptr) {
- WaitCallback(_adminChannel);
+ WaitCallback(guard, _adminChannel);
if (_adminChannel != nullptr) {
_channels.Unregister(_adminChannel);
toDelete.reset(_adminChannel);
_adminChannel = nullptr;
}
}
- Unlock();
}
@@ -630,13 +621,12 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
uint32_t writeWork;
assert(packet != nullptr);
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
if (_state >= FNET_CLOSING) {
if (_flags._discarding) {
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
- Unlock();
} else {
- Unlock();
+ guard.unlock();
packet->Free(); // discard packet
}
return false; // connection is down
@@ -650,15 +640,13 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
if (GetConfig()->_directWrite) {
_flags._writeLock = true;
_queue.FlushPackets_NoLock(&_myQueue);
- Unlock();
+ guard.unlock();
Write(true);
} else {
AddRef_NoLock();
- Unlock();
+ guard.unlock();
Owner()->EnableWrite(this, /* needRef = */ false);
}
- } else {
- Unlock();
}
return true;
}
@@ -667,10 +655,9 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
uint32_t
FNET_Connection::GetQueueLen()
{
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
uint32_t ret = _queue.GetPacketCnt_NoLock()
+ _myQueue.GetPacketCnt_NoLock();
- Unlock();
return ret;
}
@@ -735,12 +722,14 @@ FNET_Connection::HandleWriteEvent()
case FNET_CONNECTING:
error = _socket.get_so_error();
if (error == 0) { // connect ok
- Lock();
- _state = FNET_CONNECTED; // SetState(FNET_CONNECTED)
- LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
- GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
- bool writePending = (_writeWork > 0);
- Unlock();
+ bool writePending;
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ _state = FNET_CONNECTED; // SetState(FNET_CONNECTED)
+ LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
+ GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
+ writePending = (_writeWork > 0);
+ }
if (!writePending)
EnableWriteEvent(false);
} else {
@@ -751,17 +740,20 @@ FNET_Connection::HandleWriteEvent()
}
break;
case FNET_CONNECTED:
- Lock();
- if (_flags._writeLock) {
- Unlock();
- EnableWriteEvent(false);
- return true;
+ {
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ if (_flags._writeLock) {
+ guard.unlock();
+ EnableWriteEvent(false);
+ return true;
+ }
+ _flags._writeLock = true;
+ _queue.FlushPackets_NoLock(&_myQueue);
}
- _flags._writeLock = true;
- _queue.FlushPackets_NoLock(&_myQueue);
- Unlock();
broken = !Write(false);
break;
+ }
case FNET_CLOSING:
case FNET_CLOSED:
default:
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 699516554e2..1126f8bf1e6 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -138,12 +138,12 @@ private:
* control packets all channels are callback
* targets at the same time.
**/
- void WaitCallback(FNET_Channel *channel)
+ void WaitCallback(std::unique_lock<std::mutex> &guard, FNET_Channel *channel)
{
while (_flags._inCallback
&& (_callbackTarget == channel || _callbackTarget == nullptr)) {
_flags._callbackWait = true;
- Wait();
+ _ioc_cond.wait(guard);
}
}
@@ -153,11 +153,11 @@ private:
* called. When this method returns the object will be in callback
* mode, but not locked.
**/
- void BeforeCallback(FNET_Channel *channel)
+ void BeforeCallback(std::unique_lock<std::mutex> &guard, FNET_Channel *channel)
{
_flags._inCallback = true;
_callbackTarget = channel;
- Unlock();
+ guard.unlock();
}
/**
@@ -166,13 +166,13 @@ private:
* called, but not locked. When this method returns the object is no
* longer in callback mode, but locked.
**/
- void AfterCallback()
+ void AfterCallback(std::unique_lock<std::mutex> &guard)
{
- Lock();
+ guard.lock();
_flags._inCallback = false;
if (_flags._callbackWait) {
_flags._callbackWait = false;
- Broadcast();
+ _ioc_cond.notify_all();
}
}
diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp
index e5544ceff0e..ec51c1f080e 100644
--- a/fnet/src/vespa/fnet/iocomponent.cpp
+++ b/fnet/src/vespa/fnet/iocomponent.cpp
@@ -18,6 +18,7 @@ FNET_IOComponent::FNET_IOComponent(FNET_TransportThread *owner,
_ioc_spec(nullptr),
_flags(shouldTimeOut),
_ioc_timestamp(fastos::ClockSystem::now()),
+ _ioc_lock(),
_ioc_cond(),
_ioc_refcnt(1),
_ioc_directPacketWriteCnt(0),
@@ -47,10 +48,9 @@ FNET_IOComponent::UpdateTimeOut() {
void
FNET_IOComponent::AddRef()
{
- Lock();
+ std::unique_lock<std::mutex> guard(_ioc_lock);
assert(_ioc_refcnt > 0);
_ioc_refcnt++;
- Unlock();
}
@@ -65,27 +65,26 @@ FNET_IOComponent::AddRef_NoLock()
void
FNET_IOComponent::SubRef()
{
- Lock();
- assert(_ioc_refcnt > 0);
- if (--_ioc_refcnt > 0) {
- Unlock();
- return;
+ {
+ std::unique_lock<std::mutex> guard(_ioc_lock);
+ assert(_ioc_refcnt > 0);
+ if (--_ioc_refcnt > 0) {
+ return;
+ }
}
- Unlock();
CleanupHook();
delete this;
}
void
-FNET_IOComponent::SubRef_HasLock()
+FNET_IOComponent::SubRef_HasLock(std::unique_lock<std::mutex> guard)
{
assert(_ioc_refcnt > 0);
if (--_ioc_refcnt > 0) {
- Unlock();
return;
}
- Unlock();
+ guard.unlock();
CleanupHook();
delete this;
}
diff --git a/fnet/src/vespa/fnet/iocomponent.h b/fnet/src/vespa/fnet/iocomponent.h
index d0d8cee85b7..a48930428c7 100644
--- a/fnet/src/vespa/fnet/iocomponent.h
+++ b/fnet/src/vespa/fnet/iocomponent.h
@@ -3,9 +3,10 @@
#pragma once
#include "stats.h"
-#include <vespa/fastos/cond.h>
#include <vespa/fastos/timestamp.h>
#include <vespa/vespalib/net/selector.h>
+#include <mutex>
+#include <condition_variable>
class FNET_TransportThread;
class FNET_StatCounters;
@@ -50,7 +51,8 @@ protected:
char *_ioc_spec; // connect/listen spec
Flags _flags; // Compressed representation of boolean flags;
fastos::TimeStamp _ioc_timestamp; // last I/O activity
- FastOS_Cond _ioc_cond; // synchronization
+ std::mutex _ioc_lock; // synchronization
+ std::condition_variable _ioc_cond; // synchronization
uint32_t _ioc_refcnt; // reference counter
// direct write stats kept locally
@@ -86,38 +88,10 @@ public:
**/
const char *GetSpec() const { return _ioc_spec; }
-
- /**
- * Lock object to gain exclusive access.
- **/
- void Lock() { _ioc_cond.Lock(); }
-
-
- /**
- * Unlock object to yield exclusive access.
- **/
- void Unlock() { _ioc_cond.Unlock(); }
-
-
- /**
- * Wait on this object. Caller should have lock on object.
- **/
- void Wait() { _ioc_cond.Wait(); }
-
-
- /**
- * Signal one thread waiting on this object. Caller should have
- * lock.
- **/
- void Signal() { _ioc_cond.Signal(); }
-
-
- /**
- * Signal all thread waiting on this object. Caller should have
- * lock.
- **/
- void Broadcast() { _ioc_cond.Broadcast(); }
-
+ /*
+ * Get a guard to gain exclusive access.
+ */
+ std::unique_lock<std::mutex> getGuard() { return std::unique_lock<std::mutex>(_ioc_lock); }
/**
* Allocate a reference to this component. This method locks the
@@ -145,7 +119,7 @@ public:
* protect the reference counter, but assumes that the lock has
* already been obtained when this method is called.
**/
- void SubRef_HasLock();
+ void SubRef_HasLock(std::unique_lock<std::mutex> guard);
/**
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index bd3ad3c53ee..61fcc26f1c6 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -169,9 +169,8 @@ FNET_TransportThread::UpdateStats()
for (FNET_IOComponent *comp = _componentsHead;
comp != nullptr; comp = comp->_ioc_next)
{
- comp->Lock();
+ auto guard(comp->getGuard());
comp->FlushDirectWriteStats();
- comp->Unlock();
}
{
std::unique_lock<std::mutex> guard(_lock);