diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-08-08 18:06:20 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-08-08 18:06:20 +0200 |
commit | 6fcdf0c3dddcfbc183fddbcc39a00332a80ed0a8 (patch) | |
tree | 18c4052f67f35e8809206547ceb7fa4cb151d68d /messagebus | |
parent | f02007471df3064dbdc2f53157e071df430eeaec (diff) |
Use a thread local counter to break recursion if handleReply is called synchronously during send.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 53db81a28a8..a2de4d8c80e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -27,6 +27,13 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked private volatile int pendingCount = 0; private volatile boolean closed = false; private final Deque<BlockedMessage> blockedQ = new LinkedList<>(); + private final static class Counter { + private int count = 0; + void inc() { count ++; } + void dec() { count --; } + boolean enough() { return count > 5; } + } + private static ThreadLocal<Counter> sendBlockedRecurseLevel = ThreadLocal.withInitial(Counter::new); /** * The default constructor requires values for all final member variables @@ -248,14 +255,21 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked } private void sendBlockedMessages() { - synchronized (lock) { - for (boolean success = true; success && !blockedQ.isEmpty(); ) { - BlockedMessage msg = blockedQ.remove(); - success = msg.sendOrExpire(); - if ( ! success) { - blockedQ.addFirst(msg); + Counter recurselevel = sendBlockedRecurseLevel.get(); + if (recurselevel.enough()) return; + try { + recurselevel.inc(); + synchronized (lock) { + for (boolean success = true; success && !blockedQ.isEmpty(); ) { + BlockedMessage msg = blockedQ.remove(); + success = msg.sendOrExpire(); + if (!success) { + blockedQ.addFirst(msg); + } } } + } finally { + recurselevel.dec(); } } |