summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-10-13 18:36:41 +0200
committerjonmv <venstad@gmail.com>2023-10-13 18:36:41 +0200
commit3510956935745871a7774628265b7b9492f6c155 (patch)
tree1e0b545a9e7155c5a201242b4486e6c75faa7cb1 /messagebus
parent6606a8d38c25db4180317cb7aa31fa5b9b5c4815 (diff)
Fix test to fail when sequenced-task discards it on destroy()
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java19
1 files changed, 13 insertions, 6 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
index 493afe4f12d..e13d370fe8c 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
@@ -9,6 +9,8 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -115,11 +117,16 @@ public class SequencerTestCase {
int n = 10000;
CountDownLatch latch = new CountDownLatch(n);
AtomicReference<Reply> waiting = new AtomicReference<>();
+ Executor executor = Executors.newSingleThreadExecutor();
MessageHandler sender = message -> {
- Reply reply = new EmptyReply();
- reply.swapState(message);
- reply.setMessage(message);
- if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply);
+ Runnable task = () -> {
+ Reply reply = new EmptyReply();
+ reply.swapState(message);
+ reply.setMessage(message);
+ if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply);
+ };
+ if (Math.random() < 0.5) executor.execute(task); // Usually, RPC thread runs this.
+ else task.run(); // But on, e.g., timeouts, it runs in the caller thread instead.
};
Queue<Message> answered = new ConcurrentLinkedQueue<>();
@@ -130,7 +137,7 @@ public class SequencerTestCase {
Messenger messenger = new Messenger();
messenger.start();
- Sequencer sequencer = new Sequencer(sender, messenger);
+ Sequencer sequencer = new Sequencer(sender, messenger); // Not using the messenger results in a stack overflow error.
Queue<Message> sent = new ConcurrentLinkedQueue<>();
for (int i = 0; i < 10000; i++) {
@@ -142,7 +149,7 @@ public class SequencerTestCase {
waiting.get().popHandler().handleReply(waiting.get());
assertTrue(latch.await(10, TimeUnit.SECONDS), "All messages should obtain a reply within 10s");
- assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed, but typically something like 2, 1, 4, 3, 6, 5, ...
+ assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed at all!
messenger.destroy();
}