diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-09-13 11:56:10 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-09-13 11:56:10 +0200 |
commit | 37fb72343d08777b96e47c438feedb7f520ed42a (patch) | |
tree | dc43c08cd34d858edb7d769e9651b7fc1e71979e /messagebus | |
parent | b505c37019302e404df15b346ed0513fa8a83762 (diff) |
- Use a token to limit how many that will try resending.
- Use a dedicated lock for the blockedQ.
- hold no lock when doing resend.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 31 |
1 files changed, 22 insertions, 9 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index a2de4d8c80e..0b731a4c6fb 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -18,12 +18,13 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked private final AtomicBoolean destroyed = new AtomicBoolean(false); private final CountDownLatch done = new CountDownLatch(1); + private final AtomicBoolean sendingBlockedToken = new AtomicBoolean(false); private final Object lock = new Object(); private final MessageBus mbus; private final Sequencer sequencer; private final ReplyHandler replyHandler; private final ThrottlePolicy throttlePolicy; - private volatile double timeout; + private volatile double timeout; // volatile only for tests private volatile int pendingCount = 0; private volatile boolean closed = false; private final Deque<BlockedMessage> blockedQ = new LinkedList<>(); @@ -240,7 +241,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked Result res = send(msg); if (isSendQFull(res)) { BlockedMessage blockedMessage = new BlockedMessage(msg); - synchronized (lock) { + synchronized (blockedQ) { blockedQ.add(blockedMessage); } res = blockedMessage.waitComplete(); @@ -249,27 +250,39 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked } private void expireStalledBlockedMessages() { - synchronized (lock) { + synchronized (blockedQ) { blockedQ.removeIf(BlockedMessage::notifyIfExpired); } } + private BlockedMessage getNextBlockedMessage() { + synchronized (blockedQ) { + return blockedQ.poll(); + } + } + private void sendBlockedMessages() { Counter recurselevel = sendBlockedRecurseLevel.get(); if (recurselevel.enough()) return; + boolean someoneElseIsTakingCareOfIt = sendingBlockedToken.getAndSet(true); + if (someoneElseIsTakingCareOfIt) return; try { recurselevel.inc(); - synchronized (lock) { - for (boolean success = true; success && !blockedQ.isEmpty(); ) { - BlockedMessage msg = blockedQ.remove(); - success = msg.sendOrExpire(); - if (!success) { + BlockedMessage msg = getNextBlockedMessage(); + for (boolean success = true; success && msg != null; ) { + success = msg.sendOrExpire(); + if (!success) { + // Failed sending, put it back at the head of the Q. + synchronized (blockedQ) { blockedQ.addFirst(msg); } + } else { + msg = getNextBlockedMessage(); } } } finally { recurselevel.dec(); + sendingBlockedToken.set(false); } } @@ -286,8 +299,8 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked throttlePolicy.processReply(reply); } done = (closed && pendingCount == 0); - sendBlockedMessages(); } + sendBlockedMessages(); if (reply.getTrace().shouldTrace(TraceLevel.COMPONENT)) { reply.getTrace().trace(TraceLevel.COMPONENT, "Source session received reply. " + pendingCount + " message(s) now pending."); |