diff options
author | jonmv <venstad@gmail.com> | 2023-10-13 18:36:41 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-10-13 18:36:41 +0200 |
commit | 3510956935745871a7774628265b7b9492f6c155 (patch) | |
tree | 1e0b545a9e7155c5a201242b4486e6c75faa7cb1 | |
parent | 6606a8d38c25db4180317cb7aa31fa5b9b5c4815 (diff) |
Fix test to fail when sequenced-task discards it on destroy()
-rw-r--r-- | messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java index 493afe4f12d..e13d370fe8c 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -9,6 +9,8 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -115,11 +117,16 @@ public class SequencerTestCase { int n = 10000; CountDownLatch latch = new CountDownLatch(n); AtomicReference<Reply> waiting = new AtomicReference<>(); + Executor executor = Executors.newSingleThreadExecutor(); MessageHandler sender = message -> { - Reply reply = new EmptyReply(); - reply.swapState(message); - reply.setMessage(message); - if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply); + Runnable task = () -> { + Reply reply = new EmptyReply(); + reply.swapState(message); + reply.setMessage(message); + if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply); + }; + if (Math.random() < 0.5) executor.execute(task); // Usually, RPC thread runs this. + else task.run(); // But on, e.g., timeouts, it runs in the caller thread instead. }; Queue<Message> answered = new ConcurrentLinkedQueue<>(); @@ -130,7 +137,7 @@ public class SequencerTestCase { Messenger messenger = new Messenger(); messenger.start(); - Sequencer sequencer = new Sequencer(sender, messenger); + Sequencer sequencer = new Sequencer(sender, messenger); // Not using the messenger results in a stack overflow error. Queue<Message> sent = new ConcurrentLinkedQueue<>(); for (int i = 0; i < 10000; i++) { @@ -142,7 +149,7 @@ public class SequencerTestCase { waiting.get().popHandler().handleReply(waiting.get()); assertTrue(latch.await(10, TimeUnit.SECONDS), "All messages should obtain a reply within 10s"); - assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed, but typically something like 2, 1, 4, 3, 6, 5, ... + assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed at all! messenger.destroy(); } |