diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-21 17:50:17 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-21 17:50:17 +0100 |
commit | a7e8bb9dcf3c674a3756e0f0383384593856415a (patch) | |
tree | 3944389e6b3d0e5b0ef7992808a3ca1ff24ff260 /fnet | |
parent | f67ad6e4bdb5cf4b834428c61bc18953d9efd761 (diff) | |
parent | eddc91fb205d4bc8e68aa72be86ed39a199728b5 (diff) |
Merge pull request #21285 from vespa-engine/vekterli/more-threading-fixes
More miscellaneous threading fixes [run-systemtest]
Diffstat (limited to 'fnet')
-rw-r--r-- | fnet/src/tests/frt/rpc/detach_return_invoke.cpp | 17 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.cpp | 20 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/connection.h | 6 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 10 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 6 |
5 files changed, 32 insertions, 27 deletions
diff --git a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp index e3c238cf633..17c38ab6e3a 100644 --- a/fnet/src/tests/frt/rpc/detach_return_invoke.cpp +++ b/fnet/src/tests/frt/rpc/detach_return_invoke.cpp @@ -5,15 +5,16 @@ #include <vespa/fnet/frt/rpcrequest.h> #include <vespa/fnet/frt/invoker.h> #include <vespa/vespalib/util/stringfmt.h> +#include <atomic> #include <thread> struct Receptor : public FRT_IRequestWait { - FRT_RPCRequest *req; + std::atomic<FRT_RPCRequest*> req; - Receptor() : req(0) {} + Receptor() : req(nullptr) {} void RequestDone(FRT_RPCRequest *r) override { - req = r; + req.store(r); } }; @@ -55,18 +56,18 @@ TEST("detach return invoke") { target->InvokeSync(req, 5.0); EXPECT_TRUE(!req->IsError()); for (uint32_t i = 0; i < 1000; ++i) { - if (receptor.req != 0) { + if (receptor.req.load() != nullptr) { break; } std::this_thread::sleep_for(10ms); } req->SubRef(); target->SubRef(); - if (receptor.req != 0) { - EXPECT_TRUE(!receptor.req->IsError()); - receptor.req->SubRef(); + if (receptor.req.load() != nullptr) { + EXPECT_TRUE(!receptor.req.load()->IsError()); + receptor.req.load()->SubRef(); } - EXPECT_TRUE(receptor.req != 0); + EXPECT_TRUE(receptor.req.load() != nullptr); }; TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index 83224178e15..e3259db848b 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -125,8 +125,8 @@ FNET_Connection::SetState(State state) std::vector<FNET_Channel::UP> toDelete; std::unique_lock<std::mutex> guard(_ioc_lock); - oldstate = _state; - _state = state; + oldstate = GetState(); + _state.store(state, std::memory_order_relaxed); if (LOG_WOULD_LOG(debug) && state != oldstate) { LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), GetStateString(oldstate), GetStateString(state)); @@ -570,7 +570,7 @@ FNET_Connection::Init() } // handle close by admin channel init - if (_state == FNET_CLOSED) { + if (GetState() == FNET_CLOSED) { return false; } @@ -600,7 +600,7 @@ FNET_Connection::handle_handshake_act() { assert(_flags._handshake_work_pending); _flags._handshake_work_pending = false; - return ((_state == FNET_CONNECTING) && handshake()); + return ((GetState() == FNET_CONNECTING) && handshake()); } void @@ -619,7 +619,7 @@ FNET_Connection::OpenChannel(FNET_IPacketHandler *handler, FNET_Channel * ret = nullptr; std::unique_lock<std::mutex> guard(_ioc_lock); - if (__builtin_expect(_state < FNET_CLOSING, true)) { + if (__builtin_expect(GetState() < FNET_CLOSING, true)) { newChannel->SetID(GetNextID()); if (chid != nullptr) { *chid = newChannel->GetID(); @@ -698,7 +698,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) assert(packet != nullptr); std::unique_lock<std::mutex> guard(_ioc_lock); - if (_state >= FNET_CLOSING) { + if (GetState() >= FNET_CLOSING) { if (_flags._discarding) { _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); } else { @@ -710,7 +710,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) writeWork = _writeWork; _writeWork++; _queue.QueuePacket_NoLock(packet, FNET_Context(chid)); - if ((writeWork == 0) && (_state == FNET_CONNECTED)) { + if ((writeWork == 0) && (GetState() == FNET_CONNECTED)) { AddRef_NoLock(); guard.unlock(); Owner()->EnableWrite(this, /* needRef = */ false); @@ -756,7 +756,7 @@ FNET_Connection::HandleReadEvent() { bool broken = false; // is connection broken ? - switch(_state) { + switch(GetState()) { case FNET_CONNECTING: broken = !handshake(); break; @@ -776,7 +776,7 @@ bool FNET_Connection::writePendingAfterConnect() { std::lock_guard<std::mutex> guard(_ioc_lock); - _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) + _state.store(FNET_CONNECTED, std::memory_order_relaxed); // SetState(FNET_CONNECTED) LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); return (_writeWork > 0); @@ -787,7 +787,7 @@ FNET_Connection::HandleWriteEvent() { bool broken = false; // is connection broken ? - switch(_state) { + switch(GetState()) { case FNET_CONNECTING: broken = !handshake(); break; diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h index e86b670b7e5..af4483e23e5 100644 --- a/fnet/src/vespa/fnet/connection.h +++ b/fnet/src/vespa/fnet/connection.h @@ -100,7 +100,7 @@ private: vespalib::CryptoSocket::UP _socket; // socket for this conn ResolveHandlerSP _resolve_handler; // async resolve callback FNET_Context _context; // connection context - State _state; // connection state + std::atomic<State> _state; // connection state. May be polled outside lock Flags _flags; // Packed flags. uint32_t _packetLength; // packet length uint32_t _packetCode; // packet code @@ -339,9 +339,9 @@ public: /** - * @return current connection state. + * @return current connection state. May be stale if read outside lock. **/ - State GetState() { return _state; } + State GetState() const noexcept { return _state.load(std::memory_order_relaxed); } /** diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 53f8fea40cd..1d55ac3b327 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -236,7 +236,7 @@ FNET_TransportThread::~FNET_TransportThread() { std::lock_guard<std::mutex> guard(_shutdownLock); } - if (_started.load() && !_finished) { + if (_started.load() && !is_finished()) { LOG(error, "Transport: delete called on active object!"); } else { std::lock_guard guard(_pseudo_thread); @@ -380,11 +380,11 @@ FNET_TransportThread::ShutDown(bool waitFinished) void FNET_TransportThread::WaitFinished() { - if (_finished) + if (is_finished()) return; std::unique_lock<std::mutex> guard(_shutdownLock); - while (!_finished) + while (!is_finished()) _shutdownCond.wait(guard); } @@ -501,7 +501,7 @@ FNET_TransportThread::EventLoopIteration() { if (!IsShutDown()) return true; - if (_finished) + if (is_finished()) return false; endEventLoop(); @@ -557,7 +557,7 @@ FNET_TransportThread::endEventLoop() { { std::lock_guard<std::mutex> guard(_shutdownLock); - _finished = true; + _finished.store(true, std::memory_order_relaxed); _shutdownCond.notify_all(); } diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index cf1f68a8b39..dfdcf4e1970 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -50,7 +50,7 @@ private: std::recursive_mutex _pseudo_thread; // used after transport thread has shut down std::atomic<bool> _started; // event loop started ? std::atomic<bool> _shutdown; // should stop event loop ? - bool _finished; // event loop stopped ? + std::atomic<bool> _finished; // event loop stopped ? /** * Add an IOComponent to the list of components. This operation is @@ -173,6 +173,10 @@ private: return _shutdown.load(std::memory_order_relaxed); } + [[nodiscard]] bool is_finished() const noexcept { + return _finished.load(std::memory_order_relaxed); + } + public: FNET_TransportThread(const FNET_TransportThread &) = delete; FNET_TransportThread &operator=(const FNET_TransportThread &) = delete; |