diff options
author | Håvard Pettersen <havardpe@oath.com> | 2022-03-14 10:54:24 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2022-03-14 10:54:24 +0000 |
commit | 424ba43f3c8f677c04fdc4dc840cbd8b5f0e9034 (patch) | |
tree | b29975bb6b44ee7ee6085bcabce72c50d11716f5 /messagebus | |
parent | 7cf63413a6b1873d901acea154e40ccbf4d8147b (diff) |
avoid race in trace messages regarding pending count
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/vespa/messagebus/sourcesession.cpp | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index 0cf7135a1db..d4440b60895 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -75,6 +75,7 @@ SourceSession::send(Message::UP msg) if (msg->getTimeRemaining() == 0ms) { msg->setTimeRemaining(_timeout); } + uint32_t my_pending_count = 0; { std::lock_guard guard(_lock); if (_closed) { @@ -89,12 +90,12 @@ SourceSession::send(Message::UP msg) if (_throttlePolicy) { _throttlePolicy->processMessage(*msg); } - ++_pendingCount; + my_pending_count = ++_pendingCount; } if (msg->getTrace().shouldTrace(TraceLevel::COMPONENT)) { msg->getTrace().trace(TraceLevel::COMPONENT, make_string("Source session accepted a %d byte message. %d message(s) now pending.", - msg->getApproxSize(), _pendingCount)); + msg->getApproxSize(), my_pending_count)); } msg->pushHandler(*this); _sequencer.handleMessage(std::move(msg)); @@ -105,6 +106,7 @@ void SourceSession::handleReply(Reply::UP reply) { bool done; + uint32_t my_pending_count = 0; { std::lock_guard guard(_lock); assert(_pendingCount > 0); @@ -112,11 +114,12 @@ SourceSession::handleReply(Reply::UP reply) if (_throttlePolicy) { _throttlePolicy->processReply(*reply); } + my_pending_count = _pendingCount; done = (_closed && _pendingCount == 0); } if (reply->getTrace().shouldTrace(TraceLevel::COMPONENT)) { reply->getTrace().trace(TraceLevel::COMPONENT, - make_string("Source session received reply. %d message(s) now pending.", _pendingCount)); + make_string("Source session received reply. %d message(s) now pending.", my_pending_count)); } IReplyHandler &handler = reply->getCallStack().pop(*reply); handler.handleReply(std::move(reply)); |