From dd33e09806ef4649c79b72c5440221898fc1b4c1 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Wed, 9 Aug 2023 11:29:06 +0000 Subject: more robust shutdown to avoid multi-close race --- fnet/src/tests/sync_execute/sync_execute.cpp | 8 ++++-- fnet/src/vespa/fnet/transport_thread.cpp | 37 +++++++++++++++------------- fnet/src/vespa/fnet/transport_thread.h | 5 ++-- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/fnet/src/tests/sync_execute/sync_execute.cpp b/fnet/src/tests/sync_execute/sync_execute.cpp index 5d2f4097ab4..0dd65b08874 100644 --- a/fnet/src/tests/sync_execute/sync_execute.cpp +++ b/fnet/src/tests/sync_execute/sync_execute.cpp @@ -17,6 +17,8 @@ TEST("sync execute") { DoIt exe2; DoIt exe3; DoIt exe4; + DoIt exe5; + DoIt exe6; FNET_Transport transport; ASSERT_TRUE(transport.execute(&exe1)); ASSERT_TRUE(transport.Start()); @@ -26,14 +28,16 @@ TEST("sync execute") { ASSERT_TRUE(exe2.gate.getCount() == 0u); ASSERT_TRUE(transport.execute(&exe3)); transport.ShutDown(false); - ASSERT_TRUE(!transport.execute(&exe4)); + uint32_t expect_cnt_4 = transport.execute(&exe4) ? 0 : 1; transport.sync(); transport.WaitFinished(); + ASSERT_TRUE(!transport.execute(&exe5)); transport.sync(); ASSERT_TRUE(exe1.gate.getCount() == 0u); ASSERT_TRUE(exe2.gate.getCount() == 0u); ASSERT_TRUE(exe3.gate.getCount() == 0u); - ASSERT_TRUE(exe4.gate.getCount() == 1u); + ASSERT_TRUE(exe4.gate.getCount() == expect_cnt_4); + ASSERT_TRUE(exe5.gate.getCount() == 1u); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 0b0df02c04c..217738b7364 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -119,7 +119,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, size_t qLen; { std::unique_lock guard(_lock); - if (IsShutDown()) { + if (_reject_events) { guard.unlock(); DiscardEvent(cpacket, context); return false; @@ -243,7 +243,8 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) _started(false), _shutdown(false), _finished(false), - _detaching() + _detaching(), + _reject_events(false) { trapsigpipe(); } @@ -384,9 +385,9 @@ FNET_TransportThread::ShutDown(bool waitFinished) bool wasEmpty = false; { std::lock_guard guard(_lock); - if (!IsShutDown()) { + if (!should_shut_down()) { _shutdown.store(true, std::memory_order_relaxed); - wasEmpty = _queue.IsEmpty_NoLock(); + wasEmpty = _queue.IsEmpty_NoLock(); } } if (wasEmpty) { @@ -503,7 +504,7 @@ FNET_TransportThread::handle_event(FNET_IOComponent &ctx, bool read, bool write) bool FNET_TransportThread::EventLoopIteration() { - if (!IsShutDown()) { + if (!should_shut_down()) { int msTimeout = vespalib::count_ms(time_tools().event_timeout()); // obtain I/O events _selector.poll(msTimeout); @@ -530,7 +531,7 @@ FNET_TransportThread::EventLoopIteration() { FlushDeleteList(); } // -- END OF MAIN EVENT LOOP -- - if (!IsShutDown()) + if (!should_shut_down()) return true; if (is_finished()) return false; @@ -552,10 +553,22 @@ FNET_TransportThread::checkTimedoutComponents(vespalib::duration timeout) { void FNET_TransportThread::endEventLoop() { + // close and remove all I/O Components + FNET_IOComponent *component = _componentsHead; + while (component != nullptr) { + assert(component == _componentsHead); + FNET_IOComponent *tmp = component; + component = component->_ioc_next; + RemoveComponent(tmp); + tmp->Close(); + tmp->internal_subref(); + } + // flush event queue { std::lock_guard guard(_lock); _queue.FlushPackets_NoLock(&_myQueue); + _reject_events = true; } // discard remaining events @@ -569,16 +582,6 @@ FNET_TransportThread::endEventLoop() { } } - // close and remove all I/O Components - FNET_IOComponent *component = _componentsHead; - while (component != nullptr) { - assert(component == _componentsHead); - FNET_IOComponent *tmp = component; - component = component->_ioc_next; - RemoveComponent(tmp); - tmp->Close(); - tmp->internal_subref(); - } assert(_componentsHead == nullptr && _componentsTail == nullptr && _timeOutHead == nullptr && @@ -588,7 +591,7 @@ FNET_TransportThread::endEventLoop() { { std::lock_guard guard(_shutdownLock); - _finished.store(true, std::memory_order_relaxed); + _finished.store(true, std::memory_order_release); _shutdownCond.notify_all(); } diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 6047d4e3482..c7ada472501 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -52,6 +52,7 @@ private: std::atomic _shutdown; // should stop event loop ? std::atomic _finished; // event loop stopped ? std::set _detaching; // server adapters being detached + bool _reject_events; // the transport thread does not want any more events /** * Add an IOComponent to the list of components. This operation is @@ -169,12 +170,12 @@ private: **/ bool EventLoopIteration(); - bool IsShutDown() const noexcept { + [[nodiscard]] bool should_shut_down() const noexcept { return _shutdown.load(std::memory_order_relaxed); } [[nodiscard]] bool is_finished() const noexcept { - return _finished.load(std::memory_order_relaxed); + return _finished.load(std::memory_order_acquire); } public: -- cgit v1.2.3