diff options
Diffstat (limited to 'messagebus/src/main/java/com')
3 files changed, 33 insertions, 10 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 19b75fcfec4..eeea999cc14 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -185,6 +185,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, msn.start(); } + Messenger messenger() { return msn; } + /** * <p>Sets the destroyed flag to true. The very first time this method is * called, it cleans up all its dependencies. Even if you retain a reference diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java index 78ddac5398e..05e71e0f006 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java @@ -20,14 +20,20 @@ public class Sequencer implements MessageHandler, ReplyHandler { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final MessageHandler sender; private final Map<Long, Queue<Message>> seqMap = new HashMap<>(); + private final Messenger msn; + private final static ThreadLocal<Boolean> isSending = ThreadLocal.withInitial(() -> Boolean.FALSE); /** * Constructs a new sequencer on top of the given async sender. * * @param sender The underlying sender. */ - public Sequencer(MessageHandler sender) { + public Sequencer(MessageHandler sender, Messenger msn) { this.sender = sender; + this.msn = msn; + } + public Sequencer(MessageHandler sender) { + this(sender, null); } /** @@ -67,11 +73,7 @@ public class Sequencer implements MessageHandler, ReplyHandler { msg.setContext(seqId); synchronized (this) { if (seqMap.containsKey(seqId)) { - Queue<Message> queue = seqMap.get(seqId); - if (queue == null) { - queue = new LinkedList<>(); - seqMap.put(seqId, queue); - } + Queue<Message> queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>()); if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) { msg.getTrace().trace(TraceLevel.COMPONENT, "Sequencer queued message with sequence id '" + seqId + "'."); @@ -137,6 +139,19 @@ public class Sequencer implements MessageHandler, ReplyHandler { reply.getTrace().trace(TraceLevel.COMPONENT, "Sequencer received reply with sequence id '" + seqId + "'."); } + sendNextInSequence(seqId); + ReplyHandler handler = reply.popHandler(); + handler.handleReply(reply); + } + + private class SequencedSendTask implements Messenger.Task { + private final Message msg; + SequencedSendTask(Message msg) { this.msg = msg; } + @Override public void run() { sequencedSend(msg); } + @Override public void destroy() { } + } + + private void sendNextInSequence(long seqId) { Message msg = null; synchronized (this) { Queue<Message> queue = seqMap.get(seqId); @@ -147,10 +162,16 @@ public class Sequencer implements MessageHandler, ReplyHandler { } } if (msg != null) { - sequencedSend(msg); + Boolean alreadySending = isSending.get(); + if (alreadySending && (msn != null)) { + // Dispatch in another thread to break possibly very long recursion. + msn.enqueue(new SequencedSendTask(msg)); + } else { + isSending.set(Boolean.TRUE); + sequencedSend(msg); + } + isSending.set(Boolean.FALSE); } - ReplyHandler handler = reply.popHandler(); - handler.handleReply(reply); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 3b53e46956b..75751b1e2ce 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -48,7 +48,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked */ SourceSession(MessageBus mbus, SourceSessionParams params) { this.mbus = mbus; - sequencer = new Sequencer(mbus); + sequencer = new Sequencer(mbus, mbus.messenger()); if (!params.hasReplyHandler()) { throw new NullPointerException("Reply handler is null."); } |