diff options
author | jonmv <venstad@gmail.com> | 2023-10-13 17:25:45 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-10-13 17:25:45 +0200 |
commit | 6606a8d38c25db4180317cb7aa31fa5b9b5c4815 (patch) | |
tree | 9195f8d19a5a2f85b802c90cb944a225277cceac /messagebus/src/test | |
parent | 3741bd8da83e2b2b27fd0f746689bb11464576f3 (diff) |
Test sequencer with messenger and deep reply->send-next recursion
Diffstat (limited to 'messagebus/src/test')
-rw-r--r-- | messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java | 46 |
1 files changed, 45 insertions, 1 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java index f06ff4f5f73..493afe4f12d 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -6,8 +6,15 @@ import org.junit.jupiter.api.Test; import java.util.LinkedList; import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author Simon Thoresen Hult @@ -102,6 +109,43 @@ public class SequencerTestCase { assertEquals(0, dst.size()); } + @Test + void testRecursiveSending() throws InterruptedException { + // This test queues up a lot of replies, and then has them all ready to return at once. + int n = 10000; + CountDownLatch latch = new CountDownLatch(n); + AtomicReference<Reply> waiting = new AtomicReference<>(); + MessageHandler sender = message -> { + Reply reply = new EmptyReply(); + reply.swapState(message); + reply.setMessage(message); + if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply); + }; + + Queue<Message> answered = new ConcurrentLinkedQueue<>(); + ReplyHandler handler = reply -> { + answered.add(reply.getMessage()); + latch.countDown(); + }; + + Messenger messenger = new Messenger(); + messenger.start(); + Sequencer sequencer = new Sequencer(sender, messenger); + + Queue<Message> sent = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < 10000; i++) { + Message message = new MyMessage(true, 1); + message.pushHandler(handler); + sequencer.handleMessage(message); + sent.add(message); + } + + waiting.get().popHandler().handleReply(waiting.get()); + assertTrue(latch.await(10, TimeUnit.SECONDS), "All messages should obtain a reply within 10s"); + assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed, but typically something like 2, 1, 4, 3, 6, 5, ... + messenger.destroy(); + } + private static class TestQueue extends LinkedList<Routable> implements ReplyHandler { void checkReply(boolean hasSeqId, long seqId) { |