summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-09-13 11:56:10 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-09-13 11:56:10 +0200
commit37fb72343d08777b96e47c438feedb7f520ed42a (patch)
treedc43c08cd34d858edb7d769e9651b7fc1e71979e /messagebus
parentb505c37019302e404df15b346ed0513fa8a83762 (diff)
- Use a token to limit how many that will try resending.
- Use a dedicated lock for the blockedQ. - hold no lock when doing resend.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java31
1 files changed, 22 insertions, 9 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index a2de4d8c80e..0b731a4c6fb 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -18,12 +18,13 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final CountDownLatch done = new CountDownLatch(1);
+ private final AtomicBoolean sendingBlockedToken = new AtomicBoolean(false);
private final Object lock = new Object();
private final MessageBus mbus;
private final Sequencer sequencer;
private final ReplyHandler replyHandler;
private final ThrottlePolicy throttlePolicy;
- private volatile double timeout;
+ private volatile double timeout; // volatile only for tests
private volatile int pendingCount = 0;
private volatile boolean closed = false;
private final Deque<BlockedMessage> blockedQ = new LinkedList<>();
@@ -240,7 +241,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
Result res = send(msg);
if (isSendQFull(res)) {
BlockedMessage blockedMessage = new BlockedMessage(msg);
- synchronized (lock) {
+ synchronized (blockedQ) {
blockedQ.add(blockedMessage);
}
res = blockedMessage.waitComplete();
@@ -249,27 +250,39 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
}
private void expireStalledBlockedMessages() {
- synchronized (lock) {
+ synchronized (blockedQ) {
blockedQ.removeIf(BlockedMessage::notifyIfExpired);
}
}
+ private BlockedMessage getNextBlockedMessage() {
+ synchronized (blockedQ) {
+ return blockedQ.poll();
+ }
+ }
+
private void sendBlockedMessages() {
Counter recurselevel = sendBlockedRecurseLevel.get();
if (recurselevel.enough()) return;
+ boolean someoneElseIsTakingCareOfIt = sendingBlockedToken.getAndSet(true);
+ if (someoneElseIsTakingCareOfIt) return;
try {
recurselevel.inc();
- synchronized (lock) {
- for (boolean success = true; success && !blockedQ.isEmpty(); ) {
- BlockedMessage msg = blockedQ.remove();
- success = msg.sendOrExpire();
- if (!success) {
+ BlockedMessage msg = getNextBlockedMessage();
+ for (boolean success = true; success && msg != null; ) {
+ success = msg.sendOrExpire();
+ if (!success) {
+ // Failed sending, put it back at the head of the Q.
+ synchronized (blockedQ) {
blockedQ.addFirst(msg);
}
+ } else {
+ msg = getNextBlockedMessage();
}
}
} finally {
recurselevel.dec();
+ sendingBlockedToken.set(false);
}
}
@@ -286,8 +299,8 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
throttlePolicy.processReply(reply);
}
done = (closed && pendingCount == 0);
- sendBlockedMessages();
}
+ sendBlockedMessages();
if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
reply.getTrace().trace(TraceLevel.COMPONENT,
"Source session received reply. " + pendingCount + " message(s) now pending.");