summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-08-08 18:06:20 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-08-08 18:06:20 +0200
commit6fcdf0c3dddcfbc183fddbcc39a00332a80ed0a8 (patch)
tree18c4052f67f35e8809206547ceb7fa4cb151d68d /messagebus
parentf02007471df3064dbdc2f53157e071df430eeaec (diff)
Use a thread local counter to break recursion if handleReply is called synchronously during send.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java26
1 files changed, 20 insertions, 6 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 53db81a28a8..a2de4d8c80e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -27,6 +27,13 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
private volatile int pendingCount = 0;
private volatile boolean closed = false;
private final Deque<BlockedMessage> blockedQ = new LinkedList<>();
+ private final static class Counter {
+ private int count = 0;
+ void inc() { count ++; }
+ void dec() { count --; }
+ boolean enough() { return count > 5; }
+ }
+ private static ThreadLocal<Counter> sendBlockedRecurseLevel = ThreadLocal.withInitial(Counter::new);
/**
* The default constructor requires values for all final member variables
@@ -248,14 +255,21 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
}
private void sendBlockedMessages() {
- synchronized (lock) {
- for (boolean success = true; success && !blockedQ.isEmpty(); ) {
- BlockedMessage msg = blockedQ.remove();
- success = msg.sendOrExpire();
- if ( ! success) {
- blockedQ.addFirst(msg);
+ Counter recurselevel = sendBlockedRecurseLevel.get();
+ if (recurselevel.enough()) return;
+ try {
+ recurselevel.inc();
+ synchronized (lock) {
+ for (boolean success = true; success && !blockedQ.isEmpty(); ) {
+ BlockedMessage msg = blockedQ.remove();
+ success = msg.sendOrExpire();
+ if (!success) {
+ blockedQ.addFirst(msg);
+ }
}
}
+ } finally {
+ recurselevel.dec();
}
}