summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 12:22:26 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 12:22:26 +0100
commit26cea98ce8096e95fadf8d2d88db36a25cbde3e1 (patch)
treefc8862299a447cb7088e34f758f08536a31a8478 /messagebus
parent71915f0cf36924b290f7a6196a2b9c637e4c9570 (diff)
Expire stalled messages in BlockedQ earlier.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java17
1 files changed, 14 insertions, 3 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 554b093df83..892fe505687 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -9,6 +9,7 @@ import com.yahoo.text.Utf8String;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
import java.util.logging.Logger;
/**
@@ -263,7 +264,7 @@ public final class SourceSession implements ReplyHandler, Runnable {
Result res = send(msg);
if (isSendQFull(res)) {
BlockedMessage blockedMessage = new BlockedMessage(msg);
- synchronized (lock) {
+ synchronized (blockedQ) {
blockedQ.add(blockedMessage);
}
res = blockedMessage.waitComplete();
@@ -271,14 +272,24 @@ public final class SourceSession implements ReplyHandler, Runnable {
return res;
}
+ private void expireStalledBlockedMessages() {
+ blockedQ.removeIf(new Predicate<BlockedMessage>() {
+ @Override
+ public boolean test(BlockedMessage blockedMessage) {
+ return blockedMessage.getMessage().isExpired();
+ }
+ });
+ }
+
private void sendBlockedMessages() {
- synchronized (lock) {
+ synchronized (blockedQ) {
for (boolean success = true; success && !blockedQ.isEmpty(); ) {
success = blockedQ.element().sendOrExpire();
if (success) {
blockedQ.remove();
}
}
+ expireStalledBlockedMessages();
}
}
@@ -295,8 +306,8 @@ public final class SourceSession implements ReplyHandler, Runnable {
throttlePolicy.processReply(reply);
}
done = (closed && pendingCount == 0);
- sendBlockedMessages();
}
+ sendBlockedMessages();
if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
reply.getTrace().trace(TraceLevel.COMPONENT,
"Source session received reply. " + pendingCount + " message(s) now pending.");