diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-08-08 17:25:56 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-08-08 17:25:56 +0200 |
commit | 9646479684bc870dac0304127bffe549d56dcb5d (patch) | |
tree | dcc033f7464e38dd83b7a2d034f2834953a093ab /messagebus | |
parent | 36470a3776ba8e848fdd8c45f8ef4328e858c130 (diff) |
Take the message out of the send Q in order to ensure progress if handleReply is called in sender context.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index f4aed92e8ce..4017252715e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -4,6 +4,7 @@ package com.yahoo.messagebus; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RoutingTable; +import java.util.Deque; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -26,7 +27,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked private volatile double timeout; private volatile int pendingCount = 0; private volatile boolean closed = false; - private final Queue<BlockedMessage> blockedQ = new LinkedList<>(); + private final Deque<BlockedMessage> blockedQ = new LinkedList<>(); /** * The default constructor requires values for all final member variables @@ -174,8 +175,6 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked } } - Message getMessage() { return msg; } - boolean notifyIfExpired() { if (msg.isExpired()) { Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"); @@ -252,9 +251,10 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked private void sendBlockedMessages() { synchronized (lock) { for (boolean success = true; success && !blockedQ.isEmpty(); ) { - success = blockedQ.element().sendOrExpire(); - if (success) { - blockedQ.remove(); + BlockedMessage msg = blockedQ.remove(); + success = msg.sendOrExpire(); + if ( ! success) { + blockedQ.addFirst(msg); } } } |