summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-08-10 00:29:48 +0200
committerGitHub <noreply@github.com>2023-08-10 00:29:48 +0200
commita589bd0a2c27a09b1bafb3a666f7548d4b86771e (patch)
tree146d44e5baa3ccc8ba31976b6768086cce4d281d
parente8c966890fbc22d6ce5556b0ba708bf0dec4c739 (diff)
parent04d3d92c90e0875e42eae3784ed9aa64cc33fdcd (diff)
Merge pull request #28004 from vespa-engine/havardpe/avoid-multi-close
Havardpe/avoid multi close
-rw-r--r--fnet/src/tests/sync_execute/sync_execute.cpp8
-rw-r--r--fnet/src/vespa/fnet/connection.cpp13
-rw-r--r--fnet/src/vespa/fnet/connection.h2
-rw-r--r--fnet/src/vespa/fnet/transport_thread.cpp37
-rw-r--r--fnet/src/vespa/fnet/transport_thread.h5
5 files changed, 36 insertions, 29 deletions
diff --git a/fnet/src/tests/sync_execute/sync_execute.cpp b/fnet/src/tests/sync_execute/sync_execute.cpp
index 5d2f4097ab4..0dd65b08874 100644
--- a/fnet/src/tests/sync_execute/sync_execute.cpp
+++ b/fnet/src/tests/sync_execute/sync_execute.cpp
@@ -17,6 +17,8 @@ TEST("sync execute") {
DoIt exe2;
DoIt exe3;
DoIt exe4;
+ DoIt exe5;
+ DoIt exe6;
FNET_Transport transport;
ASSERT_TRUE(transport.execute(&exe1));
ASSERT_TRUE(transport.Start());
@@ -26,14 +28,16 @@ TEST("sync execute") {
ASSERT_TRUE(exe2.gate.getCount() == 0u);
ASSERT_TRUE(transport.execute(&exe3));
transport.ShutDown(false);
- ASSERT_TRUE(!transport.execute(&exe4));
+ uint32_t expect_cnt_4 = transport.execute(&exe4) ? 0 : 1;
transport.sync();
transport.WaitFinished();
+ ASSERT_TRUE(!transport.execute(&exe5));
transport.sync();
ASSERT_TRUE(exe1.gate.getCount() == 0u);
ASSERT_TRUE(exe2.gate.getCount() == 0u);
ASSERT_TRUE(exe3.gate.getCount() == 0u);
- ASSERT_TRUE(exe4.gate.getCount() == 1u);
+ ASSERT_TRUE(exe4.gate.getCount() == expect_cnt_4);
+ ASSERT_TRUE(exe5.gate.getCount() == 1u);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp
index a770561344f..314fc7517e5 100644
--- a/fnet/src/vespa/fnet/connection.cpp
+++ b/fnet/src/vespa/fnet/connection.cpp
@@ -526,7 +526,7 @@ FNET_Connection::FNET_Connection(FNET_TransportThread *owner,
FNET_Connection::~FNET_Connection()
{
- assert(!_resolve_handler.load());
+ assert(!_resolve_handler);
_num_connections.fetch_sub(1, std::memory_order_relaxed);
}
@@ -541,7 +541,7 @@ FNET_Connection::Init()
// initiate async resolve
if (IsClient()) {
_resolve_handler = std::make_shared<ResolveHandler>(this);
- Owner()->owner().resolve_async(GetSpec(), _resolve_handler.load());
+ Owner()->owner().resolve_async(GetSpec(), _resolve_handler);
}
return true;
}
@@ -555,12 +555,11 @@ FNET_Connection::server_adapter()
bool
FNET_Connection::handle_add_event()
{
- std::shared_ptr<ResolveHandler> resolve_handler = _resolve_handler.exchange({});
- if (resolve_handler) {
+ if (_resolve_handler) {
auto tweak = [this](vespalib::SocketHandle &handle) { return Owner()->tune(handle); };
- _socket = Owner()->owner().create_client_crypto_socket(resolve_handler->address.connect(tweak), vespalib::SocketSpec(GetSpec()));
+ _socket = Owner()->owner().create_client_crypto_socket(_resolve_handler->address.connect(tweak), vespalib::SocketSpec(GetSpec()));
_ioc_socket_fd = _socket->get_fd();
- resolve_handler.reset();
+ _resolve_handler.reset();
}
return (_socket && (_socket->get_fd() >= 0));
}
@@ -681,7 +680,7 @@ FNET_Connection::Sync()
void
FNET_Connection::Close()
{
- _resolve_handler.store({});
+ _resolve_handler.reset();
detach_selector();
SetState(FNET_CLOSED);
_ioc_socket_fd = -1;
diff --git a/fnet/src/vespa/fnet/connection.h b/fnet/src/vespa/fnet/connection.h
index 8fe78cba8c2..0db71db14e0 100644
--- a/fnet/src/vespa/fnet/connection.h
+++ b/fnet/src/vespa/fnet/connection.h
@@ -71,7 +71,7 @@ private:
void handle_result(vespalib::SocketAddress result) override;
~ResolveHandler();
};
- using ResolveHandlerSP = std::atomic<std::shared_ptr<ResolveHandler>>;
+ using ResolveHandlerSP = std::shared_ptr<ResolveHandler>;
FNET_IPacketStreamer *_streamer; // custom packet streamer
FNET_IServerAdapter *_serverAdapter; // only on server side
vespalib::CryptoSocket::UP _socket; // socket for this conn
diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp
index 0b0df02c04c..217738b7364 100644
--- a/fnet/src/vespa/fnet/transport_thread.cpp
+++ b/fnet/src/vespa/fnet/transport_thread.cpp
@@ -119,7 +119,7 @@ FNET_TransportThread::PostEvent(FNET_ControlPacket *cpacket,
size_t qLen;
{
std::unique_lock<std::mutex> guard(_lock);
- if (IsShutDown()) {
+ if (_reject_events) {
guard.unlock();
DiscardEvent(cpacket, context);
return false;
@@ -243,7 +243,8 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in)
_started(false),
_shutdown(false),
_finished(false),
- _detaching()
+ _detaching(),
+ _reject_events(false)
{
trapsigpipe();
}
@@ -384,9 +385,9 @@ FNET_TransportThread::ShutDown(bool waitFinished)
bool wasEmpty = false;
{
std::lock_guard<std::mutex> guard(_lock);
- if (!IsShutDown()) {
+ if (!should_shut_down()) {
_shutdown.store(true, std::memory_order_relaxed);
- wasEmpty = _queue.IsEmpty_NoLock();
+ wasEmpty = _queue.IsEmpty_NoLock();
}
}
if (wasEmpty) {
@@ -503,7 +504,7 @@ FNET_TransportThread::handle_event(FNET_IOComponent &ctx, bool read, bool write)
bool
FNET_TransportThread::EventLoopIteration() {
- if (!IsShutDown()) {
+ if (!should_shut_down()) {
int msTimeout = vespalib::count_ms(time_tools().event_timeout());
// obtain I/O events
_selector.poll(msTimeout);
@@ -530,7 +531,7 @@ FNET_TransportThread::EventLoopIteration() {
FlushDeleteList();
} // -- END OF MAIN EVENT LOOP --
- if (!IsShutDown())
+ if (!should_shut_down())
return true;
if (is_finished())
return false;
@@ -552,10 +553,22 @@ FNET_TransportThread::checkTimedoutComponents(vespalib::duration timeout) {
void
FNET_TransportThread::endEventLoop() {
+ // close and remove all I/O Components
+ FNET_IOComponent *component = _componentsHead;
+ while (component != nullptr) {
+ assert(component == _componentsHead);
+ FNET_IOComponent *tmp = component;
+ component = component->_ioc_next;
+ RemoveComponent(tmp);
+ tmp->Close();
+ tmp->internal_subref();
+ }
+
// flush event queue
{
std::lock_guard<std::mutex> guard(_lock);
_queue.FlushPackets_NoLock(&_myQueue);
+ _reject_events = true;
}
// discard remaining events
@@ -569,16 +582,6 @@ FNET_TransportThread::endEventLoop() {
}
}
- // close and remove all I/O Components
- FNET_IOComponent *component = _componentsHead;
- while (component != nullptr) {
- assert(component == _componentsHead);
- FNET_IOComponent *tmp = component;
- component = component->_ioc_next;
- RemoveComponent(tmp);
- tmp->Close();
- tmp->internal_subref();
- }
assert(_componentsHead == nullptr &&
_componentsTail == nullptr &&
_timeOutHead == nullptr &&
@@ -588,7 +591,7 @@ FNET_TransportThread::endEventLoop() {
{
std::lock_guard<std::mutex> guard(_shutdownLock);
- _finished.store(true, std::memory_order_relaxed);
+ _finished.store(true, std::memory_order_release);
_shutdownCond.notify_all();
}
diff --git a/fnet/src/vespa/fnet/transport_thread.h b/fnet/src/vespa/fnet/transport_thread.h
index 6047d4e3482..c7ada472501 100644
--- a/fnet/src/vespa/fnet/transport_thread.h
+++ b/fnet/src/vespa/fnet/transport_thread.h
@@ -52,6 +52,7 @@ private:
std::atomic<bool> _shutdown; // should stop event loop ?
std::atomic<bool> _finished; // event loop stopped ?
std::set<FNET_IServerAdapter*> _detaching; // server adapters being detached
+ bool _reject_events; // the transport thread does not want any more events
/**
* Add an IOComponent to the list of components. This operation is
@@ -169,12 +170,12 @@ private:
**/
bool EventLoopIteration();
- bool IsShutDown() const noexcept {
+ [[nodiscard]] bool should_shut_down() const noexcept {
return _shutdown.load(std::memory_order_relaxed);
}
[[nodiscard]] bool is_finished() const noexcept {
- return _finished.load(std::memory_order_relaxed);
+ return _finished.load(std::memory_order_acquire);
}
public: