From 6606a8d38c25db4180317cb7aa31fa5b9b5c4815 Mon Sep 17 00:00:00 2001 From: jonmv Date: Fri, 13 Oct 2023 17:25:45 +0200 Subject: Test sequencer with messenger and deep reply->send-next recursion --- .../main/java/com/yahoo/messagebus/Sequencer.java | 2 +- .../com/yahoo/messagebus/SequencerTestCase.java | 46 +++++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) (limited to 'messagebus/src') diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java index 2244559b54a..05e71e0f006 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java @@ -148,7 +148,7 @@ public class Sequencer implements MessageHandler, ReplyHandler { private final Message msg; SequencedSendTask(Message msg) { this.msg = msg; } @Override public void run() { sequencedSend(msg); } - @Override public void destroy() { msg.discard(); } + @Override public void destroy() { } } private void sendNextInSequence(long seqId) { diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java index f06ff4f5f73..493afe4f12d 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -6,8 +6,15 @@ import org.junit.jupiter.api.Test; import java.util.LinkedList; import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author Simon Thoresen Hult @@ -102,6 +109,43 @@ public class SequencerTestCase { assertEquals(0, dst.size()); } + @Test + void testRecursiveSending() throws InterruptedException { + // This test queues up a lot of replies, and then has them all ready to return at once. + int n = 10000; + CountDownLatch latch = new CountDownLatch(n); + AtomicReference waiting = new AtomicReference<>(); + MessageHandler sender = message -> { + Reply reply = new EmptyReply(); + reply.swapState(message); + reply.setMessage(message); + if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply); + }; + + Queue answered = new ConcurrentLinkedQueue<>(); + ReplyHandler handler = reply -> { + answered.add(reply.getMessage()); + latch.countDown(); + }; + + Messenger messenger = new Messenger(); + messenger.start(); + Sequencer sequencer = new Sequencer(sender, messenger); + + Queue sent = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < 10000; i++) { + Message message = new MyMessage(true, 1); + message.pushHandler(handler); + sequencer.handleMessage(message); + sent.add(message); + } + + waiting.get().popHandler().handleReply(waiting.get()); + assertTrue(latch.await(10, TimeUnit.SECONDS), "All messages should obtain a reply within 10s"); + assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed, but typically something like 2, 1, 4, 3, 6, 5, ... + messenger.destroy(); + } + private static class TestQueue extends LinkedList implements ReplyHandler { void checkReply(boolean hasSeqId, long seqId) { -- cgit v1.2.3