diff options
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index f4aed92e8ce..4017252715e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -4,6 +4,7 @@ package com.yahoo.messagebus; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RoutingTable; +import java.util.Deque; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -26,7 +27,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked private volatile double timeout; private volatile int pendingCount = 0; private volatile boolean closed = false; - private final Queue<BlockedMessage> blockedQ = new LinkedList<>(); + private final Deque<BlockedMessage> blockedQ = new LinkedList<>(); /** * The default constructor requires values for all final member variables @@ -174,8 +175,6 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked } } - Message getMessage() { return msg; } - boolean notifyIfExpired() { if (msg.isExpired()) { Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"); @@ -252,9 +251,10 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked private void sendBlockedMessages() { synchronized (lock) { for (boolean success = true; success && !blockedQ.isEmpty(); ) { - success = blockedQ.element().sendOrExpire(); - if (success) { - blockedQ.remove(); + BlockedMessage msg = blockedQ.remove(); + success = msg.sendOrExpire(); + if ( ! success) { + blockedQ.addFirst(msg); } } } |