diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-21 17:50:17 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-21 17:50:17 +0100 |
commit | a7e8bb9dcf3c674a3756e0f0383384593856415a (patch) | |
tree | 3944389e6b3d0e5b0ef7992808a3ca1ff24ff260 /messagebus/src | |
parent | f67ad6e4bdb5cf4b834428c61bc18953d9efd761 (diff) | |
parent | eddc91fb205d4bc8e68aa72be86ed39a199728b5 (diff) |
Merge pull request #21285 from vespa-engine/vekterli/more-threading-fixes
More miscellaneous threading fixes [run-systemtest]
Diffstat (limited to 'messagebus/src')
4 files changed, 34 insertions, 18 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index ed2ce3d638e..c33f918a39c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -423,6 +423,8 @@ RPCNetwork::sync() void RPCNetwork::shutdown() { + // Unschedule any pending target pool flush task that may race with shutdown target flushing + _scheduler.Kill(_targetPoolTask.get()); _transport->ShutDown(true); _threadPool->Close(); _executor->shutdown().sync(); diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index 44e6890415a..b403c65f863 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -54,14 +54,21 @@ void RPCTargetPool::flushTargets(bool force) { uint64_t currentTime = _timer->getMilliTime(); + // Erase RPC targets outside our lock to prevent the following mutex order inversion potential: + // flushTargets (pool lock) -> FNET transport thread post event (transport thread lock) + // FNET CheckTasks (transport thread lock) -> periodic flushTargets task run -> flushTargets (pool lock) + std::vector<Entry> to_erase_on_scope_exit; LockGuard guard(_lock); - TargetMap::iterator it = _targets.begin(); - while (it != _targets.end()) { - const Entry &entry = it->second; - if ( ! entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) { - _targets.erase(it++); // postfix increment to move the iterator - } else { - ++it; + { + auto it = _targets.begin(); + while (it != _targets.end()) { + const Entry& entry = it->second; + if (!entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) { + to_erase_on_scope_exit.emplace_back(std::move(it->second)); + it = _targets.erase(it); + } else { + ++it; + } } } } diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp index eb959dc17b4..80e5925f1a6 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.cpp +++ b/messagebus/src/vespa/messagebus/routing/resender.cpp @@ -10,9 +10,10 @@ using namespace std::chrono; namespace mbus { -Resender::Resender(IRetryPolicy::SP retryPolicy) : - _queue(), - _retryPolicy(retryPolicy) +Resender::Resender(IRetryPolicy::SP retryPolicy) + : _queue_mutex(), + _queue(), + _retryPolicy(retryPolicy) { } Resender::~Resender() @@ -26,13 +27,16 @@ Resender::~Resender() void Resender::resendScheduled() { - typedef std::vector<RoutingNode*> NodeList; + using NodeList = std::vector<RoutingNode*>; NodeList sendList; time_point now = steady_clock::now(); - while (!_queue.empty() && _queue.top().first <= now) { - sendList.push_back(_queue.top().second); - _queue.pop(); + { + std::lock_guard guard(_queue_mutex); + while (!_queue.empty() && _queue.top().first <= now) { + sendList.push_back(_queue.top().second); + _queue.pop(); + } } for (RoutingNode *node : sendList) { @@ -84,6 +88,7 @@ Resender::scheduleRetry(RoutingNode &node) TraceLevel::COMPONENT, vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay)); msg.setRetry(retry); + std::lock_guard guard(_queue_mutex); _queue.push(Entry(steady_clock::now() + delayMS, &node)); return true; } diff --git a/messagebus/src/vespa/messagebus/routing/resender.h b/messagebus/src/vespa/messagebus/routing/resender.h index 599ac789cab..fbce5c7fe8e 100644 --- a/messagebus/src/vespa/messagebus/routing/resender.h +++ b/messagebus/src/vespa/messagebus/routing/resender.h @@ -4,6 +4,7 @@ #include "iretrypolicy.h" #include <vespa/messagebus/queue.h> #include <vespa/messagebus/reply.h> +#include <mutex> #include <queue> #include <vector> @@ -30,6 +31,7 @@ private: }; using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>; + std::mutex _queue_mutex; PriorityQueue _queue; IRetryPolicy::SP _retryPolicy; public: @@ -45,7 +47,7 @@ public: * * @param retryPolicy The retry policy to use. */ - Resender(IRetryPolicy::SP retryPolicy); + explicit Resender(IRetryPolicy::SP retryPolicy); /** * Empties the retry queue. @@ -59,7 +61,7 @@ public: * @param errorCode The code to check. * @return True if the message can be resent. */ - bool canRetry(uint32_t errorCode) const; + [[nodiscard]] bool canRetry(uint32_t errorCode) const; /** * Returns whether or not the given reply should be retried. @@ -67,7 +69,7 @@ public: * @param reply The reply to check. * @return True if retry is required. */ - bool shouldRetry(const Reply &reply) const; + [[nodiscard]] bool shouldRetry(const Reply &reply) const; /** * Schedules the given node for resending, if enabled by message. This will @@ -78,7 +80,7 @@ public: * @param node The node to resend. * @return True if the node was queued. */ - bool scheduleRetry(RoutingNode &node); + [[nodiscard]] bool scheduleRetry(RoutingNode &node); /** * Invokes {@link RoutingNode#send()} on all routing nodes that are |