summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2022-03-14 10:54:24 +0000
committerHåvard Pettersen <havardpe@oath.com>2022-03-14 10:54:24 +0000
commit424ba43f3c8f677c04fdc4dc840cbd8b5f0e9034 (patch)
treeb29975bb6b44ee7ee6085bcabce72c50d11716f5 /messagebus
parent7cf63413a6b1873d901acea154e40ccbf4d8147b (diff)
avoid race in trace messages regarding pending count
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.cpp9
1 files changed, 6 insertions, 3 deletions
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp
index 0cf7135a1db..d4440b60895 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesession.cpp
@@ -75,6 +75,7 @@ SourceSession::send(Message::UP msg)
if (msg->getTimeRemaining() == 0ms) {
msg->setTimeRemaining(_timeout);
}
+ uint32_t my_pending_count = 0;
{
std::lock_guard guard(_lock);
if (_closed) {
@@ -89,12 +90,12 @@ SourceSession::send(Message::UP msg)
if (_throttlePolicy) {
_throttlePolicy->processMessage(*msg);
}
- ++_pendingCount;
+ my_pending_count = ++_pendingCount;
}
if (msg->getTrace().shouldTrace(TraceLevel::COMPONENT)) {
msg->getTrace().trace(TraceLevel::COMPONENT,
make_string("Source session accepted a %d byte message. %d message(s) now pending.",
- msg->getApproxSize(), _pendingCount));
+ msg->getApproxSize(), my_pending_count));
}
msg->pushHandler(*this);
_sequencer.handleMessage(std::move(msg));
@@ -105,6 +106,7 @@ void
SourceSession::handleReply(Reply::UP reply)
{
bool done;
+ uint32_t my_pending_count = 0;
{
std::lock_guard guard(_lock);
assert(_pendingCount > 0);
@@ -112,11 +114,12 @@ SourceSession::handleReply(Reply::UP reply)
if (_throttlePolicy) {
_throttlePolicy->processReply(*reply);
}
+ my_pending_count = _pendingCount;
done = (_closed && _pendingCount == 0);
}
if (reply->getTrace().shouldTrace(TraceLevel::COMPONENT)) {
reply->getTrace().trace(TraceLevel::COMPONENT,
- make_string("Source session received reply. %d message(s) now pending.", _pendingCount));
+ make_string("Source session received reply. %d message(s) now pending.", my_pending_count));
}
IReplyHandler &handler = reply->getCallStack().pop(*reply);
handler.handleReply(std::move(reply));