summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-08-08 17:25:56 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-08-08 17:25:56 +0200
commit9646479684bc870dac0304127bffe549d56dcb5d (patch)
treedcc033f7464e38dd83b7a2d034f2834953a093ab /messagebus
parent36470a3776ba8e848fdd8c45f8ef4328e858c130 (diff)
Take the message out of the send Q in order to ensure progress if handleReply is called in sender context.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java12
1 files changed, 6 insertions, 6 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index f4aed92e8ce..4017252715e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -4,6 +4,7 @@ package com.yahoo.messagebus;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
+import java.util.Deque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
@@ -26,7 +27,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
private volatile double timeout;
private volatile int pendingCount = 0;
private volatile boolean closed = false;
- private final Queue<BlockedMessage> blockedQ = new LinkedList<>();
+ private final Deque<BlockedMessage> blockedQ = new LinkedList<>();
/**
* The default constructor requires values for all final member variables
@@ -174,8 +175,6 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
}
}
- Message getMessage() { return msg; }
-
boolean notifyIfExpired() {
if (msg.isExpired()) {
Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
@@ -252,9 +251,10 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
private void sendBlockedMessages() {
synchronized (lock) {
for (boolean success = true; success && !blockedQ.isEmpty(); ) {
- success = blockedQ.element().sendOrExpire();
- if (success) {
- blockedQ.remove();
+ BlockedMessage msg = blockedQ.remove();
+ success = msg.sendOrExpire();
+ if ( ! success) {
+ blockedQ.addFirst(msg);
}
}
}