summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-02-21 17:50:17 +0100
committerGitHub <noreply@github.com>2022-02-21 17:50:17 +0100
commita7e8bb9dcf3c674a3756e0f0383384593856415a (patch)
tree3944389e6b3d0e5b0ef7992808a3ca1ff24ff260 /messagebus
parentf67ad6e4bdb5cf4b834428c61bc18953d9efd761 (diff)
parenteddc91fb205d4bc8e68aa72be86ed39a199728b5 (diff)
Merge pull request #21285 from vespa-engine/vekterli/more-threading-fixes
More miscellaneous threading fixes [run-systemtest]
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp21
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.cpp19
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.h10
4 files changed, 34 insertions, 18 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index ed2ce3d638e..c33f918a39c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -423,6 +423,8 @@ RPCNetwork::sync()
void
RPCNetwork::shutdown()
{
+ // Unschedule any pending target pool flush task that may race with shutdown target flushing
+ _scheduler.Kill(_targetPoolTask.get());
_transport->ShutDown(true);
_threadPool->Close();
_executor->shutdown().sync();
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index 44e6890415a..b403c65f863 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -54,14 +54,21 @@ void
RPCTargetPool::flushTargets(bool force)
{
uint64_t currentTime = _timer->getMilliTime();
+ // Erase RPC targets outside our lock to prevent the following mutex order inversion potential:
+ // flushTargets (pool lock) -> FNET transport thread post event (transport thread lock)
+ // FNET CheckTasks (transport thread lock) -> periodic flushTargets task run -> flushTargets (pool lock)
+ std::vector<Entry> to_erase_on_scope_exit;
LockGuard guard(_lock);
- TargetMap::iterator it = _targets.begin();
- while (it != _targets.end()) {
- const Entry &entry = it->second;
- if ( ! entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) {
- _targets.erase(it++); // postfix increment to move the iterator
- } else {
- ++it;
+ {
+ auto it = _targets.begin();
+ while (it != _targets.end()) {
+ const Entry& entry = it->second;
+ if (!entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) {
+ to_erase_on_scope_exit.emplace_back(std::move(it->second));
+ it = _targets.erase(it);
+ } else {
+ ++it;
+ }
}
}
}
diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp
index eb959dc17b4..80e5925f1a6 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.cpp
+++ b/messagebus/src/vespa/messagebus/routing/resender.cpp
@@ -10,9 +10,10 @@ using namespace std::chrono;
namespace mbus {
-Resender::Resender(IRetryPolicy::SP retryPolicy) :
- _queue(),
- _retryPolicy(retryPolicy)
+Resender::Resender(IRetryPolicy::SP retryPolicy)
+ : _queue_mutex(),
+ _queue(),
+ _retryPolicy(retryPolicy)
{ }
Resender::~Resender()
@@ -26,13 +27,16 @@ Resender::~Resender()
void
Resender::resendScheduled()
{
- typedef std::vector<RoutingNode*> NodeList;
+ using NodeList = std::vector<RoutingNode*>;
NodeList sendList;
time_point now = steady_clock::now();
- while (!_queue.empty() && _queue.top().first <= now) {
- sendList.push_back(_queue.top().second);
- _queue.pop();
+ {
+ std::lock_guard guard(_queue_mutex);
+ while (!_queue.empty() && _queue.top().first <= now) {
+ sendList.push_back(_queue.top().second);
+ _queue.pop();
+ }
}
for (RoutingNode *node : sendList) {
@@ -84,6 +88,7 @@ Resender::scheduleRetry(RoutingNode &node)
TraceLevel::COMPONENT,
vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay));
msg.setRetry(retry);
+ std::lock_guard guard(_queue_mutex);
_queue.push(Entry(steady_clock::now() + delayMS, &node));
return true;
}
diff --git a/messagebus/src/vespa/messagebus/routing/resender.h b/messagebus/src/vespa/messagebus/routing/resender.h
index 599ac789cab..fbce5c7fe8e 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.h
+++ b/messagebus/src/vespa/messagebus/routing/resender.h
@@ -4,6 +4,7 @@
#include "iretrypolicy.h"
#include <vespa/messagebus/queue.h>
#include <vespa/messagebus/reply.h>
+#include <mutex>
#include <queue>
#include <vector>
@@ -30,6 +31,7 @@ private:
};
using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>;
+ std::mutex _queue_mutex;
PriorityQueue _queue;
IRetryPolicy::SP _retryPolicy;
public:
@@ -45,7 +47,7 @@ public:
*
* @param retryPolicy The retry policy to use.
*/
- Resender(IRetryPolicy::SP retryPolicy);
+ explicit Resender(IRetryPolicy::SP retryPolicy);
/**
* Empties the retry queue.
@@ -59,7 +61,7 @@ public:
* @param errorCode The code to check.
* @return True if the message can be resent.
*/
- bool canRetry(uint32_t errorCode) const;
+ [[nodiscard]] bool canRetry(uint32_t errorCode) const;
/**
* Returns whether or not the given reply should be retried.
@@ -67,7 +69,7 @@ public:
* @param reply The reply to check.
* @return True if retry is required.
*/
- bool shouldRetry(const Reply &reply) const;
+ [[nodiscard]] bool shouldRetry(const Reply &reply) const;
/**
* Schedules the given node for resending, if enabled by message. This will
@@ -78,7 +80,7 @@ public:
* @param node The node to resend.
* @return True if the node was queued.
*/
- bool scheduleRetry(RoutingNode &node);
+ [[nodiscard]] bool scheduleRetry(RoutingNode &node);
/**
* Invokes {@link RoutingNode#send()} on all routing nodes that are