summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-08-08 17:51:10 +0200
committerGitHub <noreply@github.com>2019-08-08 17:51:10 +0200
commitf02007471df3064dbdc2f53157e071df430eeaec (patch)
treea81d7f4495037d8b11212ad58309e6dcea724539 /messagebus
parent36470a3776ba8e848fdd8c45f8ef4328e858c130 (diff)
parentacb30eb07d139c1998ddbfed4819ff6eeb2ef9a9 (diff)
Merge pull request #10217 from vespa-engine/balder/do-not-send-blocked-in-handlereply
Take the message out of the send Q in order to ensure progress if han…
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java13
1 files changed, 6 insertions, 7 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index f4aed92e8ce..53db81a28a8 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -4,8 +4,8 @@ package com.yahoo.messagebus;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
+import java.util.Deque;
import java.util.LinkedList;
-import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,7 +26,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
private volatile double timeout;
private volatile int pendingCount = 0;
private volatile boolean closed = false;
- private final Queue<BlockedMessage> blockedQ = new LinkedList<>();
+ private final Deque<BlockedMessage> blockedQ = new LinkedList<>();
/**
* The default constructor requires values for all final member variables
@@ -174,8 +174,6 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
}
}
- Message getMessage() { return msg; }
-
boolean notifyIfExpired() {
if (msg.isExpired()) {
Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
@@ -252,9 +250,10 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
private void sendBlockedMessages() {
synchronized (lock) {
for (boolean success = true; success && !blockedQ.isEmpty(); ) {
- success = blockedQ.element().sendOrExpire();
- if (success) {
- blockedQ.remove();
+ BlockedMessage msg = blockedQ.remove();
+ success = msg.sendOrExpire();
+ if ( ! success) {
+ blockedQ.addFirst(msg);
}
}
}