summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-10-13 17:25:45 +0200
committerjonmv <venstad@gmail.com>2023-10-13 17:25:45 +0200
commit6606a8d38c25db4180317cb7aa31fa5b9b5c4815 (patch)
tree9195f8d19a5a2f85b802c90cb944a225277cceac /messagebus
parent3741bd8da83e2b2b27fd0f746689bb11464576f3 (diff)
Test sequencer with messenger and deep reply->send-next recursion
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java2
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java46
2 files changed, 46 insertions, 2 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
index 2244559b54a..05e71e0f006 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
@@ -148,7 +148,7 @@ public class Sequencer implements MessageHandler, ReplyHandler {
private final Message msg;
SequencedSendTask(Message msg) { this.msg = msg; }
@Override public void run() { sequencedSend(msg); }
- @Override public void destroy() { msg.discard(); }
+ @Override public void destroy() { }
}
private void sendNextInSequence(long seqId) {
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
index f06ff4f5f73..493afe4f12d 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
@@ -6,8 +6,15 @@ import org.junit.jupiter.api.Test;
import java.util.LinkedList;
import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* @author Simon Thoresen Hult
@@ -102,6 +109,43 @@ public class SequencerTestCase {
assertEquals(0, dst.size());
}
+ @Test
+ void testRecursiveSending() throws InterruptedException {
+ // This test queues up a lot of replies, and then has them all ready to return at once.
+ int n = 10000;
+ CountDownLatch latch = new CountDownLatch(n);
+ AtomicReference<Reply> waiting = new AtomicReference<>();
+ MessageHandler sender = message -> {
+ Reply reply = new EmptyReply();
+ reply.swapState(message);
+ reply.setMessage(message);
+ if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply);
+ };
+
+ Queue<Message> answered = new ConcurrentLinkedQueue<>();
+ ReplyHandler handler = reply -> {
+ answered.add(reply.getMessage());
+ latch.countDown();
+ };
+
+ Messenger messenger = new Messenger();
+ messenger.start();
+ Sequencer sequencer = new Sequencer(sender, messenger);
+
+ Queue<Message> sent = new ConcurrentLinkedQueue<>();
+ for (int i = 0; i < 10000; i++) {
+ Message message = new MyMessage(true, 1);
+ message.pushHandler(handler);
+ sequencer.handleMessage(message);
+ sent.add(message);
+ }
+
+ waiting.get().popHandler().handleReply(waiting.get());
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "All messages should obtain a reply within 10s");
+ assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed, but typically something like 2, 1, 4, 3, 6, 5, ...
+ messenger.destroy();
+ }
+
private static class TestQueue extends LinkedList<Routable> implements ReplyHandler {
void checkReply(boolean hasSeqId, long seqId) {