diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 13:08:49 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 13:08:49 +0100 |
commit | f57f93c47f138ca9af6f0275ad6912cbbefc9439 (patch) | |
tree | 26c3da3d6ebb1170f28d4e3750054dd00ab86603 /messagebus | |
parent | 26cea98ce8096e95fadf8d2d88db36a25cbde3e1 (diff) |
Only do early expire in interval driven thread, not on every reply. Also use the same lock to avoid Q jumping messages.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 892fe505687..f64acd05bc4 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -173,12 +173,15 @@ public final class SourceSession implements ReplyHandler, Runnable { public void run() { while (!closed) { sendBlockedMessages(); + expireStalledBlockedMessages(); try { Thread.sleep(10); } catch (InterruptedException e) { return; } } + sendBlockedMessages(); + expireStalledBlockedMessages(); } private class BlockedMessage { @@ -188,7 +191,7 @@ public final class SourceSession implements ReplyHandler, Runnable { this.msg = msg; } - private synchronized void notifyFailure(Result result) { + private void notifyFailure(Result result) { synchronized (this) { this.result = result; notify(); @@ -203,12 +206,18 @@ public final class SourceSession implements ReplyHandler, Runnable { Message getMessage() { return msg; } - boolean sendOrExpire() { + boolean notifyIfExpired() { if (msg.isExpired()) { Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ"); notifyFailure(new Result(error)); replyHandler.handleReply(new SendTimeoutReply(msg, error)); - } else { + return true; + } + return false; + } + + boolean sendOrExpire() { + if ( ! notifyIfExpired() ) { Result res = sendInternal(msg); if ( ! isSendQFull(res) ) { notifySuccess(res); @@ -264,7 +273,7 @@ public final class SourceSession implements ReplyHandler, Runnable { Result res = send(msg); if (isSendQFull(res)) { BlockedMessage blockedMessage = new BlockedMessage(msg); - synchronized (blockedQ) { + synchronized (lock) { blockedQ.add(blockedMessage); } res = blockedMessage.waitComplete(); @@ -273,23 +282,24 @@ public final class SourceSession implements ReplyHandler, Runnable { } private void expireStalledBlockedMessages() { - blockedQ.removeIf(new Predicate<BlockedMessage>() { - @Override - public boolean test(BlockedMessage blockedMessage) { - return blockedMessage.getMessage().isExpired(); + synchronized (lock) { + final Iterator<BlockedMessage> each = blockedQ.iterator(); + while (each.hasNext()) { + if (each.next().notifyIfExpired()) { + each.remove(); + } } - }); + } } private void sendBlockedMessages() { - synchronized (blockedQ) { + synchronized (lock) { for (boolean success = true; success && !blockedQ.isEmpty(); ) { success = blockedQ.element().sendOrExpire(); if (success) { blockedQ.remove(); } } - expireStalledBlockedMessages(); } } @@ -306,8 +316,8 @@ public final class SourceSession implements ReplyHandler, Runnable { throttlePolicy.processReply(reply); } done = (closed && pendingCount == 0); + sendBlockedMessages(); } - sendBlockedMessages(); if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) { reply.getTrace().trace(TraceLevel.COMPONENT, "Source session received reply. " + pendingCount + " message(s) now pending."); |