summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java12
1 files changed, 6 insertions, 6 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index f4aed92e8ce..4017252715e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -4,6 +4,7 @@ package com.yahoo.messagebus;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
+import java.util.Deque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
@@ -26,7 +27,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
private volatile double timeout;
private volatile int pendingCount = 0;
private volatile boolean closed = false;
- private final Queue<BlockedMessage> blockedQ = new LinkedList<>();
+ private final Deque<BlockedMessage> blockedQ = new LinkedList<>();
/**
* The default constructor requires values for all final member variables
@@ -174,8 +175,6 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
}
}
- Message getMessage() { return msg; }
-
boolean notifyIfExpired() {
if (msg.isExpired()) {
Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
@@ -252,9 +251,10 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
private void sendBlockedMessages() {
synchronized (lock) {
for (boolean success = true; success && !blockedQ.isEmpty(); ) {
- success = blockedQ.element().sendOrExpire();
- if (success) {
- blockedQ.remove();
+ BlockedMessage msg = blockedQ.remove();
+ success = msg.sendOrExpire();
+ if ( ! success) {
+ blockedQ.addFirst(msg);
}
}
}