diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2023-08-09 11:29:06 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2023-08-09 13:59:28 +0000 |
commit | dd33e09806ef4649c79b72c5440221898fc1b4c1 (patch) | |
tree | 9c856283e411d87db4ab6af69bdc912c4c531b61 | |
parent | 0f19048a0da9e582fc836697ed4f15d7801fdabe (diff) |
more robust shutdown to avoid multi-close race
-rw-r--r-- | fnet/src/tests/sync_execute/sync_execute.cpp | 8 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.cpp | 37 | ||||
-rw-r--r-- | fnet/src/vespa/fnet/transport_thread.h | 5 |
3 files changed, 29 insertions, 21 deletions
diff --git a/fnet/src/tests/sync_execute/sync_execute.cpp b/fnet/src/tests/sync_execute/sync_execute.cpp index 5d2f4097ab4..0dd65b08874 100644 --- a/fnet/src/tests/sync_execute/sync_execute.cpp +++ b/fnet/src/tests/sync_execute/sync_execute.cpp @@ -17,6 +17,8 @@ TEST("sync execute") { DoIt exe2; DoIt exe3; DoIt exe4; + DoIt exe5; + DoIt exe6; FNET_Transport transport; ASSERT_TRUE(transport.execute(&exe1)); ASSERT_TRUE(transport.Start()); @@ -26,14 +28,16 @@ TEST("sync execute") { ASSERT_TRUE(exe2.gate.getCount() == 0u); ASSERT_TRUE(transport.execute(&exe3)); transport.ShutDown(false); - ASSERT_TRUE(!transport.execute(&exe4)); + uint32_t expect_cnt_4 = transport.execute(&exe4) ? 0 : 1; transport.sync(); transport.WaitFinished(); + ASSERT_TRUE(!transport.execute(&exe5)); transport.sync(); ASSERT_TRUE(exe1.gate.getCount() == 0u); ASSERT_TRUE(exe2.gate.getCount() == 0u); ASSERT_TRUE(exe3.gate.getCount() == 0u); - ASSERT_TRUE(exe4.gate.getCount() == 1u); + ASSERT_TRUE(exe4.gate.getCount() == expect_cnt_4); + ASSERT_TRUE(exe5.gate.getCount() == 1u); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 0b0df02c04c..217738b7364 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -119,7 +119,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket, size_t qLen; { std::unique_lock<std::mutex> guard(_lock); - if (IsShutDown()) { + if (_reject_events) { guard.unlock(); DiscardEvent(cpacket, context); return false; @@ -243,7 +243,8 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) _started(false), _shutdown(false), _finished(false), - _detaching() + _detaching(), + _reject_events(false) { trapsigpipe(); } @@ -384,9 +385,9 @@ FNET_TransportThread::ShutDown(bool waitFinished) bool wasEmpty = false; { std::lock_guard<std::mutex> guard(_lock); - if (!IsShutDown()) { + if (!should_shut_down()) { _shutdown.store(true, std::memory_order_relaxed); - wasEmpty = _queue.IsEmpty_NoLock(); + wasEmpty = _queue.IsEmpty_NoLock(); } } if (wasEmpty) { @@ -503,7 +504,7 @@ FNET_TransportThread::handle_event(FNET_IOComponent &ctx, bool read, bool write) bool FNET_TransportThread::EventLoopIteration() { - if (!IsShutDown()) { + if (!should_shut_down()) { int msTimeout = vespalib::count_ms(time_tools().event_timeout()); // obtain I/O events _selector.poll(msTimeout); @@ -530,7 +531,7 @@ FNET_TransportThread::EventLoopIteration() { FlushDeleteList(); } // -- END OF MAIN EVENT LOOP -- - if (!IsShutDown()) + if (!should_shut_down()) return true; if (is_finished()) return false; @@ -552,10 +553,22 @@ FNET_TransportThread::checkTimedoutComponents(vespalib::duration timeout) { void FNET_TransportThread::endEventLoop() { + // close and remove all I/O Components + FNET_IOComponent *component = _componentsHead; + while (component != nullptr) { + assert(component == _componentsHead); + FNET_IOComponent *tmp = component; + component = component->_ioc_next; + RemoveComponent(tmp); + tmp->Close(); + tmp->internal_subref(); + } + // flush event queue { std::lock_guard<std::mutex> guard(_lock); _queue.FlushPackets_NoLock(&_myQueue); + _reject_events = true; } // discard remaining events @@ -569,16 +582,6 @@ FNET_TransportThread::endEventLoop() { } } - // close and remove all I/O Components - FNET_IOComponent *component = _componentsHead; - while (component != nullptr) { - assert(component == _componentsHead); - FNET_IOComponent *tmp = component; - component = component->_ioc_next; - RemoveComponent(tmp); - tmp->Close(); - tmp->internal_subref(); - } assert(_componentsHead == nullptr && _componentsTail == nullptr && _timeOutHead == nullptr && @@ -588,7 +591,7 @@ FNET_TransportThread::endEventLoop() { { std::lock_guard<std::mutex> guard(_shutdownLock); - _finished.store(true, std::memory_order_relaxed); + _finished.store(true, std::memory_order_release); _shutdownCond.notify_all(); } diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h index 6047d4e3482..c7ada472501 100644 --- a/fnet/src/vespa/fnet/transport_thread.h +++ b/fnet/src/vespa/fnet/transport_thread.h @@ -52,6 +52,7 @@ private: std::atomic<bool> _shutdown; // should stop event loop ? std::atomic<bool> _finished; // event loop stopped ? std::set<FNET_IServerAdapter*> _detaching; // server adapters being detached + bool _reject_events; // the transport thread does not want any more events /** * Add an IOComponent to the list of components. This operation is @@ -169,12 +170,12 @@ private: **/ bool EventLoopIteration(); - bool IsShutDown() const noexcept { + [[nodiscard]] bool should_shut_down() const noexcept { return _shutdown.load(std::memory_order_relaxed); } [[nodiscard]] bool is_finished() const noexcept { - return _finished.load(std::memory_order_relaxed); + return _finished.load(std::memory_order_acquire); } public: |