aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-08-09 11:29:06 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-08-09 13:59:28 +0000
commitdd33e09806ef4649c79b72c5440221898fc1b4c1 (patch)
tree9c856283e411d87db4ab6af69bdc912c4c531b61
parent0f19048a0da9e582fc836697ed4f15d7801fdabe (diff)
more robust shutdown to avoid multi-close race
-rw-r--r--fnet/src/tests/sync_execute/sync_execute.cpp8
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp37
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h5
3 files changed, 29 insertions, 21 deletions
diff --git a/fnet/src/tests/sync_execute/sync_execute.cpp b/fnet/src/tests/sync_execute/sync_execute.cpp
index 5d2f4097ab4..0dd65b08874 100644
--- a/fnet/src/tests/sync_execute/sync_execute.cpp
+++ b/fnet/src/tests/sync_execute/sync_execute.cpp
@@ -17,6 +17,8 @@ TEST("sync execute") {
DoIt exe2;
DoIt exe3;
DoIt exe4;
+ DoIt exe5;
+ DoIt exe6;
FNET_Transport transport;
ASSERT_TRUE(transport.execute(&exe1));
ASSERT_TRUE(transport.Start());
@@ -26,14 +28,16 @@ TEST("sync execute") {
ASSERT_TRUE(exe2.gate.getCount() == 0u);
ASSERT_TRUE(transport.execute(&exe3));
transport.ShutDown(false);
- ASSERT_TRUE(!transport.execute(&exe4));
+ uint32_t expect_cnt_4 = transport.execute(&exe4) ? 0 : 1;
transport.sync();
transport.WaitFinished();
+ ASSERT_TRUE(!transport.execute(&exe5));
transport.sync();
ASSERT_TRUE(exe1.gate.getCount() == 0u);
ASSERT_TRUE(exe2.gate.getCount() == 0u);
ASSERT_TRUE(exe3.gate.getCount() == 0u);
- ASSERT_TRUE(exe4.gate.getCount() == 1u);
+ ASSERT_TRUE(exe4.gate.getCount() == expect_cnt_4);
+ ASSERT_TRUE(exe5.gate.getCount() == 1u);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 0b0df02c04c..217738b7364 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -119,7 +119,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
size_t qLen;
{
std::unique_lock<std::mutex> guard(_lock);
- if (IsShutDown()) {
+ if (_reject_events) {
guard.unlock();
DiscardEvent(cpacket, context);
return false;
@@ -243,7 +243,8 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
_started(false),
_shutdown(false),
_finished(false),
- _detaching()
+ _detaching(),
+ _reject_events(false)
{
trapsigpipe();
}
@@ -384,9 +385,9 @@ FNET_TransportThread::ShutDown(bool waitFinished)
bool wasEmpty = false;
{
std::lock_guard<std::mutex> guard(_lock);
- if (!IsShutDown()) {
+ if (!should_shut_down()) {
_shutdown.store(true, std::memory_order_relaxed);
- wasEmpty = _queue.IsEmpty_NoLock();
+ wasEmpty = _queue.IsEmpty_NoLock();
}
}
if (wasEmpty) {
@@ -503,7 +504,7 @@ FNET_TransportThread::handle_event(FNET_IOComponent &ctx, bool read, bool write)
bool
FNET_TransportThread::EventLoopIteration() {
- if (!IsShutDown()) {
+ if (!should_shut_down()) {
int msTimeout = vespalib::count_ms(time_tools().event_timeout());
// obtain I/O events
_selector.poll(msTimeout);
@@ -530,7 +531,7 @@ FNET_TransportThread::EventLoopIteration() {
FlushDeleteList();
} // -- END OF MAIN EVENT LOOP --
- if (!IsShutDown())
+ if (!should_shut_down())
return true;
if (is_finished())
return false;
@@ -552,10 +553,22 @@ FNET_TransportThread::checkTimedoutComponents(vespalib::duration timeout) {
void
FNET_TransportThread::endEventLoop() {
+ // close and remove all I/O Components
+ FNET_IOComponent *component = _componentsHead;
+ while (component != nullptr) {
+ assert(component == _componentsHead);
+ FNET_IOComponent *tmp = component;
+ component = component->_ioc_next;
+ RemoveComponent(tmp);
+ tmp->Close();
+ tmp->internal_subref();
+ }
+
// flush event queue
{
std::lock_guard<std::mutex> guard(_lock);
_queue.FlushPackets_NoLock(&_myQueue);
+ _reject_events = true;
}
// discard remaining events
@@ -569,16 +582,6 @@ FNET_TransportThread::endEventLoop() {
}
}
- // close and remove all I/O Components
- FNET_IOComponent *component = _componentsHead;
- while (component != nullptr) {
- assert(component == _componentsHead);
- FNET_IOComponent *tmp = component;
- component = component->_ioc_next;
- RemoveComponent(tmp);
- tmp->Close();
- tmp->internal_subref();
- }
assert(_componentsHead == nullptr &&
_componentsTail == nullptr &&
_timeOutHead == nullptr &&
@@ -588,7 +591,7 @@ FNET_TransportThread::endEventLoop() {
{
std::lock_guard<std::mutex> guard(_shutdownLock);
- _finished.store(true, std::memory_order_relaxed);
+ _finished.store(true, std::memory_order_release);
_shutdownCond.notify_all();
}
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 6047d4e3482..c7ada472501 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -52,6 +52,7 @@ private:
std::atomic<bool> _shutdown; // should stop event loop ?
std::atomic<bool> _finished; // event loop stopped ?
std::set<FNET_IServerAdapter*> _detaching; // server adapters being detached
+ bool _reject_events; // the transport thread does not want any more events
/**
* Add an IOComponent to the list of components. This operation is
@@ -169,12 +170,12 @@ private:
**/
bool EventLoopIteration();
- bool IsShutDown() const noexcept {
+ [[nodiscard]] bool should_shut_down() const noexcept {
return _shutdown.load(std::memory_order_relaxed);
}
[[nodiscard]] bool is_finished() const noexcept {
- return _finished.load(std::memory_order_relaxed);
+ return _finished.load(std::memory_order_acquire);
}
public: