diff options
5 files changed, 87 insertions, 13 deletions
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json index 15a24f82f75..093b7f3450a 100644 --- a/messagebus/abi-spec.json +++ b/messagebus/abi-spec.json @@ -800,6 +800,7 @@ "public" ], "methods" : [ + "public void <init>(com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.Messenger)", "public void <init>(com.yahoo.messagebus.MessageHandler)", "public boolean destroy()", "public void handleMessage(com.yahoo.messagebus.Message)", diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 19b75fcfec4..eeea999cc14 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -185,6 +185,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, msn.start(); } + Messenger messenger() { return msn; } + /** * <p>Sets the destroyed flag to true. The very first time this method is * called, it cleans up all its dependencies. Even if you retain a reference diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java index 78ddac5398e..05e71e0f006 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java @@ -20,14 +20,20 @@ public class Sequencer implements MessageHandler, ReplyHandler { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final MessageHandler sender; private final Map<Long, Queue<Message>> seqMap = new HashMap<>(); + private final Messenger msn; + private final static ThreadLocal<Boolean> isSending = ThreadLocal.withInitial(() -> Boolean.FALSE); /** * Constructs a new sequencer on top of the given async sender. * * @param sender The underlying sender. */ - public Sequencer(MessageHandler sender) { + public Sequencer(MessageHandler sender, Messenger msn) { this.sender = sender; + this.msn = msn; + } + public Sequencer(MessageHandler sender) { + this(sender, null); } /** @@ -67,11 +73,7 @@ public class Sequencer implements MessageHandler, ReplyHandler { msg.setContext(seqId); synchronized (this) { if (seqMap.containsKey(seqId)) { - Queue<Message> queue = seqMap.get(seqId); - if (queue == null) { - queue = new LinkedList<>(); - seqMap.put(seqId, queue); - } + Queue<Message> queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>()); if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) { msg.getTrace().trace(TraceLevel.COMPONENT, "Sequencer queued message with sequence id '" + seqId + "'."); @@ -137,6 +139,19 @@ public class Sequencer implements MessageHandler, ReplyHandler { reply.getTrace().trace(TraceLevel.COMPONENT, "Sequencer received reply with sequence id '" + seqId + "'."); } + sendNextInSequence(seqId); + ReplyHandler handler = reply.popHandler(); + handler.handleReply(reply); + } + + private class SequencedSendTask implements Messenger.Task { + private final Message msg; + SequencedSendTask(Message msg) { this.msg = msg; } + @Override public void run() { sequencedSend(msg); } + @Override public void destroy() { } + } + + private void sendNextInSequence(long seqId) { Message msg = null; synchronized (this) { Queue<Message> queue = seqMap.get(seqId); @@ -147,10 +162,16 @@ public class Sequencer implements MessageHandler, ReplyHandler { } } if (msg != null) { - sequencedSend(msg); + Boolean alreadySending = isSending.get(); + if (alreadySending && (msn != null)) { + // Dispatch in another thread to break possibly very long recursion. + msn.enqueue(new SequencedSendTask(msg)); + } else { + isSending.set(Boolean.TRUE); + sequencedSend(msg); + } + isSending.set(Boolean.FALSE); } - ReplyHandler handler = reply.popHandler(); - handler.handleReply(reply); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 3b53e46956b..75751b1e2ce 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -48,7 +48,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked */ SourceSession(MessageBus mbus, SourceSessionParams params) { this.mbus = mbus; - sequencer = new Sequencer(mbus); + sequencer = new Sequencer(mbus, mbus.messenger()); if (!params.hasReplyHandler()) { throw new NullPointerException("Reply handler is null."); } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java index 4f440acec80..e13d370fe8c 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -6,8 +6,17 @@ import org.junit.jupiter.api.Test; import java.util.LinkedList; import java.util.Queue; - -import static org.junit.jupiter.api.Assertions.*; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author Simon Thoresen Hult @@ -102,7 +111,48 @@ public class SequencerTestCase { assertEquals(0, dst.size()); } - @SuppressWarnings("serial") + @Test + void testRecursiveSending() throws InterruptedException { + // This test queues up a lot of replies, and then has them all ready to return at once. + int n = 10000; + CountDownLatch latch = new CountDownLatch(n); + AtomicReference<Reply> waiting = new AtomicReference<>(); + Executor executor = Executors.newSingleThreadExecutor(); + MessageHandler sender = message -> { + Runnable task = () -> { + Reply reply = new EmptyReply(); + reply.swapState(message); + reply.setMessage(message); + if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply); + }; + if (Math.random() < 0.5) executor.execute(task); // Usually, RPC thread runs this. + else task.run(); // But on, e.g., timeouts, it runs in the caller thread instead. + }; + + Queue<Message> answered = new ConcurrentLinkedQueue<>(); + ReplyHandler handler = reply -> { + answered.add(reply.getMessage()); + latch.countDown(); + }; + + Messenger messenger = new Messenger(); + messenger.start(); + Sequencer sequencer = new Sequencer(sender, messenger); // Not using the messenger results in a stack overflow error. + + Queue<Message> sent = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < 10000; i++) { + Message message = new MyMessage(true, 1); + message.pushHandler(handler); + sequencer.handleMessage(message); + sent.add(message); + } + + waiting.get().popHandler().handleReply(waiting.get()); + assertTrue(latch.await(10, TimeUnit.SECONDS), "All messages should obtain a reply within 10s"); + assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed at all! + messenger.destroy(); + } + private static class TestQueue extends LinkedList<Routable> implements ReplyHandler { void checkReply(boolean hasSeqId, long seqId) { |