summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-02-18 09:44:34 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-02-18 15:38:33 +0000
commit3cb027ecb236355a78429e8ed99d62aa0889d7f4 (patch)
tree342f79cd07523fca1569531cc938340c5defb95f /messagebus
parent42a6c0381614f363fa4f7748463fbcea94f7d7f2 (diff)
Make MessageBus resend-queue thread safe
May be accessed from both sender thread (in case of synchronous send failure) as well as periodic resending from MBus thread.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.cpp19
-rw-r--r--messagebus/src/vespa/messagebus/routing/resender.h10
2 files changed, 18 insertions, 11 deletions
diff --git a/messagebus/src/vespa/messagebus/routing/resender.cpp b/messagebus/src/vespa/messagebus/routing/resender.cpp
index eb959dc17b4..80e5925f1a6 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.cpp
+++ b/messagebus/src/vespa/messagebus/routing/resender.cpp
@@ -10,9 +10,10 @@ using namespace std::chrono;
namespace mbus {
-Resender::Resender(IRetryPolicy::SP retryPolicy) :
- _queue(),
- _retryPolicy(retryPolicy)
+Resender::Resender(IRetryPolicy::SP retryPolicy)
+ : _queue_mutex(),
+ _queue(),
+ _retryPolicy(retryPolicy)
{ }
Resender::~Resender()
@@ -26,13 +27,16 @@ Resender::~Resender()
void
Resender::resendScheduled()
{
- typedef std::vector<RoutingNode*> NodeList;
+ using NodeList = std::vector<RoutingNode*>;
NodeList sendList;
time_point now = steady_clock::now();
- while (!_queue.empty() && _queue.top().first <= now) {
- sendList.push_back(_queue.top().second);
- _queue.pop();
+ {
+ std::lock_guard guard(_queue_mutex);
+ while (!_queue.empty() && _queue.top().first <= now) {
+ sendList.push_back(_queue.top().second);
+ _queue.pop();
+ }
}
for (RoutingNode *node : sendList) {
@@ -84,6 +88,7 @@ Resender::scheduleRetry(RoutingNode &node)
TraceLevel::COMPONENT,
vespalib::make_string("Message scheduled for retry %u in %.3f seconds.", retry, delay));
msg.setRetry(retry);
+ std::lock_guard guard(_queue_mutex);
_queue.push(Entry(steady_clock::now() + delayMS, &node));
return true;
}
diff --git a/messagebus/src/vespa/messagebus/routing/resender.h b/messagebus/src/vespa/messagebus/routing/resender.h
index 599ac789cab..fbce5c7fe8e 100644
--- a/messagebus/src/vespa/messagebus/routing/resender.h
+++ b/messagebus/src/vespa/messagebus/routing/resender.h
@@ -4,6 +4,7 @@
#include "iretrypolicy.h"
#include <vespa/messagebus/queue.h>
#include <vespa/messagebus/reply.h>
+#include <mutex>
#include <queue>
#include <vector>
@@ -30,6 +31,7 @@ private:
};
using PriorityQueue = std::priority_queue<Entry, std::vector<Entry>, Cmp>;
+ std::mutex _queue_mutex;
PriorityQueue _queue;
IRetryPolicy::SP _retryPolicy;
public:
@@ -45,7 +47,7 @@ public:
*
* @param retryPolicy The retry policy to use.
*/
- Resender(IRetryPolicy::SP retryPolicy);
+ explicit Resender(IRetryPolicy::SP retryPolicy);
/**
* Empties the retry queue.
@@ -59,7 +61,7 @@ public:
* @param errorCode The code to check.
* @return True if the message can be resent.
*/
- bool canRetry(uint32_t errorCode) const;
+ [[nodiscard]] bool canRetry(uint32_t errorCode) const;
/**
* Returns whether or not the given reply should be retried.
@@ -67,7 +69,7 @@ public:
* @param reply The reply to check.
* @return True if retry is required.
*/
- bool shouldRetry(const Reply &reply) const;
+ [[nodiscard]] bool shouldRetry(const Reply &reply) const;
/**
* Schedules the given node for resending, if enabled by message. This will
@@ -78,7 +80,7 @@ public:
* @param node The node to resend.
* @return True if the node was queued.
*/
- bool scheduleRetry(RoutingNode &node);
+ [[nodiscard]] bool scheduleRetry(RoutingNode &node);
/**
* Invokes {@link RoutingNode#send()} on all routing nodes that are