diff options
Diffstat (limited to 'messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java | 41 |
1 files changed, 31 insertions, 10 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java index 15bc15f88b8..4e10a72c858 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; import java.util.HashMap; @@ -20,14 +20,20 @@ public class Sequencer implements MessageHandler, ReplyHandler { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final MessageHandler sender; private final Map<Long, Queue<Message>> seqMap = new HashMap<>(); + private final Messenger msn; + private final static ThreadLocal<Boolean> isSending = ThreadLocal.withInitial(() -> Boolean.FALSE); /** * Constructs a new sequencer on top of the given async sender. * * @param sender The underlying sender. */ - public Sequencer(MessageHandler sender) { + public Sequencer(MessageHandler sender, Messenger msn) { this.sender = sender; + this.msn = msn; + } + public Sequencer(MessageHandler sender) { + this(sender, null); } /** @@ -67,11 +73,7 @@ public class Sequencer implements MessageHandler, ReplyHandler { msg.setContext(seqId); synchronized (this) { if (seqMap.containsKey(seqId)) { - Queue<Message> queue = seqMap.get(seqId); - if (queue == null) { - queue = new LinkedList<>(); - seqMap.put(seqId, queue); - } + Queue<Message> queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>()); if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) { msg.getTrace().trace(TraceLevel.COMPONENT, "Sequencer queued message with sequence id '" + seqId + "'."); @@ -137,6 +139,19 @@ public class Sequencer implements MessageHandler, ReplyHandler { reply.getTrace().trace(TraceLevel.COMPONENT, "Sequencer received reply with sequence id '" + seqId + "'."); } + sendNextInSequence(seqId); + ReplyHandler handler = reply.popHandler(); + handler.handleReply(reply); + } + + private class SequencedSendTask implements Messenger.Task { + private Message msg; + SequencedSendTask(Message msg) { this.msg = msg; } + @Override public void run() { sequencedSend(msg); msg = null; } + @Override public void destroy() { if (msg != null) msg.discard(); } + } + + private void sendNextInSequence(long seqId) { Message msg = null; synchronized (this) { Queue<Message> queue = seqMap.get(seqId); @@ -147,10 +162,16 @@ public class Sequencer implements MessageHandler, ReplyHandler { } } if (msg != null) { - sequencedSend(msg); + Boolean alreadySending = isSending.get(); + if (alreadySending && (msn != null)) { + // Dispatch in another thread to break possibly very long recursion. + msn.enqueue(new SequencedSendTask(msg)); + } else { + isSending.set(Boolean.TRUE); + sequencedSend(msg); + } + isSending.set(Boolean.FALSE); } - ReplyHandler handler = reply.popHandler(); - handler.handleReply(reply); } } |