diff options
Diffstat (limited to 'messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java')
-rw-r--r-- | messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java | 61 |
1 files changed, 57 insertions, 4 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java index c3166a578b5..b077de80467 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; import com.yahoo.messagebus.test.SimpleMessage; @@ -6,8 +6,17 @@ import org.junit.jupiter.api.Test; import java.util.LinkedList; import java.util.Queue; - -import static org.junit.jupiter.api.Assertions.*; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author Simon Thoresen Hult @@ -102,7 +111,51 @@ public class SequencerTestCase { assertEquals(0, dst.size()); } - @SuppressWarnings("serial") + @Test + void testRecursiveSending() throws InterruptedException { + // This test queues up a lot of replies, and then has them all ready to return at once. + int n = 10000; + CountDownLatch latch = new CountDownLatch(n); + CountDownLatch started = new CountDownLatch(1); + AtomicReference<Reply> waiting = new AtomicReference<>(); + Executor executor = Executors.newSingleThreadExecutor(); + MessageHandler sender = message -> { + Runnable task = () -> { + Reply reply = new EmptyReply(); + reply.swapState(message); + reply.setMessage(message); + if (waiting.compareAndSet(null, reply)) started.countDown(); + else reply.popHandler().handleReply(reply); + }; + if (Math.random() < 0.5) executor.execute(task); // Usually, RPC thread runs this. + else task.run(); // But on, e.g., timeouts, it runs in the caller thread instead. + }; + + Queue<Message> answered = new ConcurrentLinkedQueue<>(); + ReplyHandler handler = reply -> { + answered.add(reply.getMessage()); + latch.countDown(); + }; + + Messenger messenger = new Messenger(); + messenger.start(); + Sequencer sequencer = new Sequencer(sender, messenger); // Not using the messenger results in a stack overflow error. + + Queue<Message> sent = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < 10000; i++) { + Message message = new MyMessage(true, 1); + message.pushHandler(handler); + sequencer.handleMessage(message); + sent.add(message); + } + + assertTrue(started.await(10, TimeUnit.SECONDS)); + waiting.get().popHandler().handleReply(waiting.get()); + assertTrue(latch.await(10, TimeUnit.SECONDS), "All messages should obtain a reply within 10s"); + assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed at all! + messenger.destroy(); + } + private static class TestQueue extends LinkedList<Routable> implements ReplyHandler { void checkReply(boolean hasSeqId, long seqId) { |