aboutsummaryrefslogtreecommitdiffstats
path: root/fnet
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-02-21 17:50:17 +0100
committerGitHub <noreply@github.com>2022-02-21 17:50:17 +0100
commita7e8bb9dcf3c674a3756e0f0383384593856415a (patch)
tree3944389e6b3d0e5b0ef7992808a3ca1ff24ff260 /fnet
parentf67ad6e4bdb5cf4b834428c61bc18953d9efd761 (diff)
parenteddc91fb205d4bc8e68aa72be86ed39a199728b5 (diff)
Merge pull request #21285 from vespa-engine/vekterli/more-threading-fixes
More miscellaneous threading fixes [run-systemtest]
Diffstat (limited to 'fnet')
-rw-r--r--fnet/src/tests/frt/rpc/detach_return_invoke.cpp17
-rw-r--r--fnet/src/vespa/fnet/connection.cpp20
-rw-r--r--fnet/src/vespa/fnet/connection.h6
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp10
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h6
5 files changed, 32 insertions, 27 deletions
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
index e3c238cf633..17c38ab6e3a 100644
--- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
+++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp
@@ -5,15 +5,16 @@
#include <vespa/fnet/frt/rpcrequest.h>
#include <vespa/fnet/frt/invoker.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <atomic>
#include <thread>
struct Receptor : public FRT_IRequestWait
{
- FRT_RPCRequest *req;
+ std::atomic<FRT_RPCRequest*> req;
- Receptor() : req(0) {}
+ Receptor() : req(nullptr) {}
void RequestDone(FRT_RPCRequest *r) override {
- req = r;
+ req.store(r);
}
};
@@ -55,18 +56,18 @@ TEST("detach return invoke") {
target->InvokeSync(req, 5.0);
EXPECT_TRUE(!req->IsError());
for (uint32_t i = 0; i < 1000; ++i) {
- if (receptor.req != 0) {
+ if (receptor.req.load() != nullptr) {
break;
}
std::this_thread::sleep_for(10ms);
}
req->SubRef();
target->SubRef();
- if (receptor.req != 0) {
- EXPECT_TRUE(!receptor.req->IsError());
- receptor.req->SubRef();
+ if (receptor.req.load() != nullptr) {
+ EXPECT_TRUE(!receptor.req.load()->IsError());
+ receptor.req.load()->SubRef();
}
- EXPECT_TRUE(receptor.req != 0);
+ EXPECT_TRUE(receptor.req.load() != nullptr);
};
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index 83224178e15..e3259db848b 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -125,8 +125,8 @@ FNET_Connection::SetState(State state)
std::vector<FNET_Channel::UP> toDelete;
std::unique_lock<std::mutex> guard(_ioc_lock);
- oldstate = _state;
- _state = state;
+ oldstate = GetState();
+ _state.store(state, std::memory_order_relaxed);
if (LOG_WOULD_LOG(debug) && state != oldstate) {
LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
GetStateString(oldstate), GetStateString(state));
@@ -570,7 +570,7 @@ FNET_Connection::Init()
}
// handle close by admin channel init
- if (_state == FNET_CLOSED) {
+ if (GetState() == FNET_CLOSED) {
return false;
}
@@ -600,7 +600,7 @@ FNET_Connection::handle_handshake_act()
{
assert(_flags._handshake_work_pending);
_flags._handshake_work_pending = false;
- return ((_state == FNET_CONNECTING) && handshake());
+ return ((GetState() == FNET_CONNECTING) && handshake());
}
void
@@ -619,7 +619,7 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler,
FNET_Channel * ret = nullptr;
std::unique_lock<std::mutex> guard(_ioc_lock);
- if (__builtin_expect(_state < FNET_CLOSING, true)) {
+ if (__builtin_expect(GetState() < FNET_CLOSING, true)) {
newChannel->SetID(GetNextID());
if (chid != nullptr) {
*chid = newChannel->GetID();
@@ -698,7 +698,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
assert(packet != nullptr);
std::unique_lock<std::mutex> guard(_ioc_lock);
- if (_state >= FNET_CLOSING) {
+ if (GetState() >= FNET_CLOSING) {
if (_flags._discarding) {
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
} else {
@@ -710,7 +710,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid)
writeWork = _writeWork;
_writeWork++;
_queue.QueuePacket_NoLock(packet, FNET_Context(chid));
- if ((writeWork == 0) && (_state == FNET_CONNECTED)) {
+ if ((writeWork == 0) && (GetState() == FNET_CONNECTED)) {
AddRef_NoLock();
guard.unlock();
Owner()->EnableWrite(this, /* needRef = */ false);
@@ -756,7 +756,7 @@ FNET_Connection::HandleReadEvent()
{
bool broken = false; // is connection broken ?
- switch(_state) {
+ switch(GetState()) {
case FNET_CONNECTING:
broken = !handshake();
break;
@@ -776,7 +776,7 @@ bool
FNET_Connection::writePendingAfterConnect()
{
std::lock_guard<std::mutex> guard(_ioc_lock);
- _state = FNET_CONNECTED; // SetState(FNET_CONNECTED)
+ _state.store(FNET_CONNECTED, std::memory_order_relaxed); // SetState(FNET_CONNECTED)
LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(),
GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED));
return (_writeWork > 0);
@@ -787,7 +787,7 @@ FNET_Connection::HandleWriteEvent()
{
bool broken = false; // is connection broken ?
- switch(_state) {
+ switch(GetState()) {
case FNET_CONNECTING:
broken = !handshake();
break;
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index e86b670b7e5..af4483e23e5 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -100,7 +100,7 @@ private:
vespalib::CryptoSocket::UP _socket; // socket for this conn
ResolveHandlerSP _resolve_handler; // async resolve callback
FNET_Context _context; // connection context
- State _state; // connection state
+ std::atomic<State> _state; // connection state. May be polled outside lock
Flags _flags; // Packed flags.
uint32_t _packetLength; // packet length
uint32_t _packetCode; // packet code
@@ -339,9 +339,9 @@ public:
/**
- * @return current connection state.
+ * @return current connection state. May be stale if read outside lock.
**/
- State GetState() { return _state; }
+ State GetState() const noexcept { return _state.load(std::memory_order_relaxed); }
/**
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 53f8fea40cd..1d55ac3b327 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -236,7 +236,7 @@ FNET_TransportThread::~FNET_TransportThread()
{
std::lock_guard<std::mutex> guard(_shutdownLock);
}
- if (_started.load() && !_finished) {
+ if (_started.load() && !is_finished()) {
LOG(error, "Transport: delete called on active object!");
} else {
std::lock_guard guard(_pseudo_thread);
@@ -380,11 +380,11 @@ FNET_TransportThread::ShutDown(bool waitFinished)
void
FNET_TransportThread::WaitFinished()
{
- if (_finished)
+ if (is_finished())
return;
std::unique_lock<std::mutex> guard(_shutdownLock);
- while (!_finished)
+ while (!is_finished())
_shutdownCond.wait(guard);
}
@@ -501,7 +501,7 @@ FNET_TransportThread::EventLoopIteration() {
if (!IsShutDown())
return true;
- if (_finished)
+ if (is_finished())
return false;
endEventLoop();
@@ -557,7 +557,7 @@ FNET_TransportThread::endEventLoop() {
{
std::lock_guard<std::mutex> guard(_shutdownLock);
- _finished = true;
+ _finished.store(true, std::memory_order_relaxed);
_shutdownCond.notify_all();
}
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index cf1f68a8b39..dfdcf4e1970 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -50,7 +50,7 @@ private:
std::recursive_mutex _pseudo_thread; // used after transport thread has shut down
std::atomic<bool> _started; // event loop started ?
std::atomic<bool> _shutdown; // should stop event loop ?
- bool _finished; // event loop stopped ?
+ std::atomic<bool> _finished; // event loop stopped ?
/**
* Add an IOComponent to the list of components. This operation is
@@ -173,6 +173,10 @@ private:
return _shutdown.load(std::memory_order_relaxed);
}
+ [[nodiscard]] bool is_finished() const noexcept {
+ return _finished.load(std::memory_order_relaxed);
+ }
+
public:
FNET_TransportThread(const FNET_TransportThread &) = delete;
FNET_TransportThread &operator=(const FNET_TransportThread &) = delete;