diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-08-08 17:51:10 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-08 17:51:10 +0200 |
commit | f02007471df3064dbdc2f53157e071df430eeaec (patch) | |
tree | a81d7f4495037d8b11212ad58309e6dcea724539 /messagebus | |
parent | 36470a3776ba8e848fdd8c45f8ef4328e858c130 (diff) | |
parent | acb30eb07d139c1998ddbfed4819ff6eeb2ef9a9 (diff) |
Merge pull request #10217 from vespa-engine/balder/do-not-send-blocked-in-handlereply
Take the message out of the send Q in order to ensure progress if han…
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index f4aed92e8ce..53db81a28a8 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -4,8 +4,8 @@ package com.yahoo.messagebus; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RoutingTable; +import java.util.Deque; import java.util.LinkedList; -import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,7 +26,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked private volatile double timeout; private volatile int pendingCount = 0; private volatile boolean closed = false; - private final Queue<BlockedMessage> blockedQ = new LinkedList<>(); + private final Deque<BlockedMessage> blockedQ = new LinkedList<>(); /** * The default constructor requires values for all final member variables @@ -174,8 +174,6 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked } } - Message getMessage() { return msg; } - boolean notifyIfExpired() { if (msg.isExpired()) { Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"); @@ -252,9 +250,10 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked private void sendBlockedMessages() { synchronized (lock) { for (boolean success = true; success && !blockedQ.isEmpty(); ) { - success = blockedQ.element().sendOrExpire(); - if (success) { - blockedQ.remove(); + BlockedMessage msg = blockedQ.remove(); + success = msg.sendOrExpire(); + if ( ! success) { + blockedQ.addFirst(msg); } } } |