summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 13:08:49 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 13:08:49 +0100
commitf57f93c47f138ca9af6f0275ad6912cbbefc9439 (patch)
tree26c3da3d6ebb1170f28d4e3750054dd00ab86603 /messagebus
parent26cea98ce8096e95fadf8d2d88db36a25cbde3e1 (diff)
Only do early expire in interval driven thread, not on every reply. Also use the same lock to avoid Q jumping messages.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java34
1 files changed, 22 insertions, 12 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 892fe505687..f64acd05bc4 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -173,12 +173,15 @@ public final class SourceSession implements ReplyHandler, Runnable {
public void run() {
while (!closed) {
sendBlockedMessages();
+ expireStalledBlockedMessages();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
return;
}
}
+ sendBlockedMessages();
+ expireStalledBlockedMessages();
}
private class BlockedMessage {
@@ -188,7 +191,7 @@ public final class SourceSession implements ReplyHandler, Runnable {
this.msg = msg;
}
- private synchronized void notifyFailure(Result result) {
+ private void notifyFailure(Result result) {
synchronized (this) {
this.result = result;
notify();
@@ -203,12 +206,18 @@ public final class SourceSession implements ReplyHandler, Runnable {
Message getMessage() { return msg; }
- boolean sendOrExpire() {
+ boolean notifyIfExpired() {
if (msg.isExpired()) {
Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
notifyFailure(new Result(error));
replyHandler.handleReply(new SendTimeoutReply(msg, error));
- } else {
+ return true;
+ }
+ return false;
+ }
+
+ boolean sendOrExpire() {
+ if ( ! notifyIfExpired() ) {
Result res = sendInternal(msg);
if ( ! isSendQFull(res) ) {
notifySuccess(res);
@@ -264,7 +273,7 @@ public final class SourceSession implements ReplyHandler, Runnable {
Result res = send(msg);
if (isSendQFull(res)) {
BlockedMessage blockedMessage = new BlockedMessage(msg);
- synchronized (blockedQ) {
+ synchronized (lock) {
blockedQ.add(blockedMessage);
}
res = blockedMessage.waitComplete();
@@ -273,23 +282,24 @@ public final class SourceSession implements ReplyHandler, Runnable {
}
private void expireStalledBlockedMessages() {
- blockedQ.removeIf(new Predicate<BlockedMessage>() {
- @Override
- public boolean test(BlockedMessage blockedMessage) {
- return blockedMessage.getMessage().isExpired();
+ synchronized (lock) {
+ final Iterator<BlockedMessage> each = blockedQ.iterator();
+ while (each.hasNext()) {
+ if (each.next().notifyIfExpired()) {
+ each.remove();
+ }
}
- });
+ }
}
private void sendBlockedMessages() {
- synchronized (blockedQ) {
+ synchronized (lock) {
for (boolean success = true; success && !blockedQ.isEmpty(); ) {
success = blockedQ.element().sendOrExpire();
if (success) {
blockedQ.remove();
}
}
- expireStalledBlockedMessages();
}
}
@@ -306,8 +316,8 @@ public final class SourceSession implements ReplyHandler, Runnable {
throttlePolicy.processReply(reply);
}
done = (closed && pendingCount == 0);
+ sendBlockedMessages();
}
- sendBlockedMessages();
if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
reply.getTrace().trace(TraceLevel.COMPONENT,
"Source session received reply. " + pendingCount + " message(s) now pending.");