diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 12:22:26 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 12:22:26 +0100 |
commit | 26cea98ce8096e95fadf8d2d88db36a25cbde3e1 (patch) | |
tree | fc8862299a447cb7088e34f758f08536a31a8478 /messagebus | |
parent | 71915f0cf36924b290f7a6196a2b9c637e4c9570 (diff) |
Expire stalled messages in BlockedQ earlier.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 17 |
1 files changed, 14 insertions, 3 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 554b093df83..892fe505687 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -9,6 +9,7 @@ import com.yahoo.text.Utf8String; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.logging.Logger; /** @@ -263,7 +264,7 @@ public final class SourceSession implements ReplyHandler, Runnable { Result res = send(msg); if (isSendQFull(res)) { BlockedMessage blockedMessage = new BlockedMessage(msg); - synchronized (lock) { + synchronized (blockedQ) { blockedQ.add(blockedMessage); } res = blockedMessage.waitComplete(); @@ -271,14 +272,24 @@ public final class SourceSession implements ReplyHandler, Runnable { return res; } + private void expireStalledBlockedMessages() { + blockedQ.removeIf(new Predicate<BlockedMessage>() { + @Override + public boolean test(BlockedMessage blockedMessage) { + return blockedMessage.getMessage().isExpired(); + } + }); + } + private void sendBlockedMessages() { - synchronized (lock) { + synchronized (blockedQ) { for (boolean success = true; success && !blockedQ.isEmpty(); ) { success = blockedQ.element().sendOrExpire(); if (success) { blockedQ.remove(); } } + expireStalledBlockedMessages(); } } @@ -295,8 +306,8 @@ public final class SourceSession implements ReplyHandler, Runnable { throttlePolicy.processReply(reply); } done = (closed && pendingCount == 0); - sendBlockedMessages(); } + sendBlockedMessages(); if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) { reply.getTrace().trace(TraceLevel.COMPONENT, "Source session received reply. " + pendingCount + " message(s) now pending."); |