diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-10-10 10:32:23 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-10-10 10:32:52 +0200 |
commit | a2e0b524d650d395a0486b752c21fe251b382436 (patch) | |
tree | 95938a89ad5f5893cb76462c8a7b11ebcfa2f54d /messagebus | |
parent | cdac5165107db05d26832b4459a66ae4098b57e3 (diff) |
Get help from the messenger thread to break possibly very long recursion.
Diffstat (limited to 'messagebus')
4 files changed, 27 insertions, 7 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 4b911d7c38e..4a54fa5b9c3 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -185,6 +185,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, msn.start(); } + Messenger messenger() { return msn; } + /** * <p>Sets the destroyed flag to true. The very first time this method is * called, it cleans up all its dependencies. Even if you retain a reference diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java index 84ef5800143..4382f6cb798 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java @@ -20,14 +20,18 @@ public class Sequencer implements MessageHandler, ReplyHandler { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final MessageHandler sender; private final Map<Long, Queue<Message>> seqMap = new HashMap<>(); + private final Messenger msn; + private final static Object BUSY = new Object(); + private final static ThreadLocal<Object> isSending = new ThreadLocal<>(); /** * Constructs a new sequencer on top of the given async sender. * * @param sender The underlying sender. */ - public Sequencer(MessageHandler sender) { + public Sequencer(MessageHandler sender, Messenger msn) { this.sender = sender; + this.msn = msn; } /** @@ -133,9 +137,16 @@ public class Sequencer implements MessageHandler, ReplyHandler { reply.getTrace().trace(TraceLevel.COMPONENT, "Sequencer received reply with sequence id '" + seqId + "'."); } + sendNextInSequence(seqId); ReplyHandler handler = reply.popHandler(); handler.handleReply(reply); - sendNextInSequence(seqId); + } + + private class SequencedSendTask implements Messenger.Task { + private final Message msg; + SequencedSendTask(Message msg) { this.msg = msg; } + @Override public void run() { sequencedSend(msg); } + @Override public void destroy() { msg.discard(); } } private void sendNextInSequence(long seqId) { @@ -149,7 +160,15 @@ public class Sequencer implements MessageHandler, ReplyHandler { } } if (msg != null) { - sequencedSend(msg); + Object alreadySending = isSending.get(); + if ((alreadySending == BUSY) && (msn != null)) { + // Dispatch in another thread to break possibly very long recursion. + msn.enqueue(new SequencedSendTask(msg)); + } else { + isSending.set(BUSY); + sequencedSend(msg); + } + isSending.set(null); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 53fd49dcf58..2db8c7655e1 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -48,7 +48,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked */ SourceSession(MessageBus mbus, SourceSessionParams params) { this.mbus = mbus; - sequencer = new Sequencer(mbus); + sequencer = new Sequencer(mbus, mbus.messenger()); if (!params.hasReplyHandler()) { throw new NullPointerException("Reply handler is null."); } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java index c3166a578b5..d44bb9235f9 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -19,7 +19,7 @@ public class SequencerTestCase { TestQueue src = new TestQueue(); TestQueue dst = new TestQueue(); QueueSender sender = new QueueSender(dst); - Sequencer seq = new Sequencer(sender); + Sequencer seq = new Sequencer(sender, null); seq.handleMessage(src.createMessage(false, 0)); seq.handleMessage(src.createMessage(false, 0)); @@ -51,7 +51,7 @@ public class SequencerTestCase { TestQueue src = new TestQueue(); TestQueue dst = new TestQueue(); QueueSender sender = new QueueSender(dst); - Sequencer seq = new Sequencer(sender); + Sequencer seq = new Sequencer(sender, null); seq.handleMessage(src.createMessage(true, 1L)); seq.handleMessage(src.createMessage(true, 2L)); @@ -102,7 +102,6 @@ public class SequencerTestCase { assertEquals(0, dst.size()); } - @SuppressWarnings("serial") private static class TestQueue extends LinkedList<Routable> implements ReplyHandler { void checkReply(boolean hasSeqId, long seqId) { |