summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2023-10-13 20:00:49 +0200
committerGitHub <noreply@github.com>2023-10-13 20:00:49 +0200
commit52569f8d6344a6cab365c2150ac9ddf6d86b2ca5 (patch)
tree1e0b545a9e7155c5a201242b4486e6c75faa7cb1
parent33ca0c01d65ba52ed0b508912069fee9db79dfcc (diff)
parent3510956935745871a7774628265b7b9492f6c155 (diff)
Merge pull request #28924 from vespa-engine/jonmv/reapply-balder-break-recusrion
Jonmv/reapply balder break recusrion
-rw-r--r--messagebus/abi-spec.json1
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java39
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java2
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java56
5 files changed, 87 insertions, 13 deletions
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json
index 15a24f82f75..093b7f3450a 100644
--- a/messagebus/abi-spec.json
+++ b/messagebus/abi-spec.json
@@ -800,6 +800,7 @@
"public"
],
"methods" : [
+ "public void <init>(com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.Messenger)",
"public void <init>(com.yahoo.messagebus.MessageHandler)",
"public boolean destroy()",
"public void handleMessage(com.yahoo.messagebus.Message)",
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 19b75fcfec4..eeea999cc14 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -185,6 +185,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
msn.start();
}
+ Messenger messenger() { return msn; }
+
/**
* <p>Sets the destroyed flag to true. The very first time this method is
* called, it cleans up all its dependencies. Even if you retain a reference
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
index 78ddac5398e..05e71e0f006 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
@@ -20,14 +20,20 @@ public class Sequencer implements MessageHandler, ReplyHandler {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final MessageHandler sender;
private final Map<Long, Queue<Message>> seqMap = new HashMap<>();
+ private final Messenger msn;
+ private final static ThreadLocal<Boolean> isSending = ThreadLocal.withInitial(() -> Boolean.FALSE);
/**
* Constructs a new sequencer on top of the given async sender.
*
* @param sender The underlying sender.
*/
- public Sequencer(MessageHandler sender) {
+ public Sequencer(MessageHandler sender, Messenger msn) {
this.sender = sender;
+ this.msn = msn;
+ }
+ public Sequencer(MessageHandler sender) {
+ this(sender, null);
}
/**
@@ -67,11 +73,7 @@ public class Sequencer implements MessageHandler, ReplyHandler {
msg.setContext(seqId);
synchronized (this) {
if (seqMap.containsKey(seqId)) {
- Queue<Message> queue = seqMap.get(seqId);
- if (queue == null) {
- queue = new LinkedList<>();
- seqMap.put(seqId, queue);
- }
+ Queue<Message> queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>());
if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
msg.getTrace().trace(TraceLevel.COMPONENT,
"Sequencer queued message with sequence id '" + seqId + "'.");
@@ -137,6 +139,19 @@ public class Sequencer implements MessageHandler, ReplyHandler {
reply.getTrace().trace(TraceLevel.COMPONENT,
"Sequencer received reply with sequence id '" + seqId + "'.");
}
+ sendNextInSequence(seqId);
+ ReplyHandler handler = reply.popHandler();
+ handler.handleReply(reply);
+ }
+
+ private class SequencedSendTask implements Messenger.Task {
+ private final Message msg;
+ SequencedSendTask(Message msg) { this.msg = msg; }
+ @Override public void run() { sequencedSend(msg); }
+ @Override public void destroy() { }
+ }
+
+ private void sendNextInSequence(long seqId) {
Message msg = null;
synchronized (this) {
Queue<Message> queue = seqMap.get(seqId);
@@ -147,10 +162,16 @@ public class Sequencer implements MessageHandler, ReplyHandler {
}
}
if (msg != null) {
- sequencedSend(msg);
+ Boolean alreadySending = isSending.get();
+ if (alreadySending && (msn != null)) {
+ // Dispatch in another thread to break possibly very long recursion.
+ msn.enqueue(new SequencedSendTask(msg));
+ } else {
+ isSending.set(Boolean.TRUE);
+ sequencedSend(msg);
+ }
+ isSending.set(Boolean.FALSE);
}
- ReplyHandler handler = reply.popHandler();
- handler.handleReply(reply);
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 3b53e46956b..75751b1e2ce 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -48,7 +48,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
*/
SourceSession(MessageBus mbus, SourceSessionParams params) {
this.mbus = mbus;
- sequencer = new Sequencer(mbus);
+ sequencer = new Sequencer(mbus, mbus.messenger());
if (!params.hasReplyHandler()) {
throw new NullPointerException("Reply handler is null.");
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
index 4f440acec80..e13d370fe8c 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
@@ -6,8 +6,17 @@ import org.junit.jupiter.api.Test;
import java.util.LinkedList;
import java.util.Queue;
-
-import static org.junit.jupiter.api.Assertions.*;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* @author Simon Thoresen Hult
@@ -102,7 +111,48 @@ public class SequencerTestCase {
assertEquals(0, dst.size());
}
- @SuppressWarnings("serial")
+ @Test
+ void testRecursiveSending() throws InterruptedException {
+ // This test queues up a lot of replies, and then has them all ready to return at once.
+ int n = 10000;
+ CountDownLatch latch = new CountDownLatch(n);
+ AtomicReference<Reply> waiting = new AtomicReference<>();
+ Executor executor = Executors.newSingleThreadExecutor();
+ MessageHandler sender = message -> {
+ Runnable task = () -> {
+ Reply reply = new EmptyReply();
+ reply.swapState(message);
+ reply.setMessage(message);
+ if ( ! waiting.compareAndSet(null, reply)) reply.popHandler().handleReply(reply);
+ };
+ if (Math.random() < 0.5) executor.execute(task); // Usually, RPC thread runs this.
+ else task.run(); // But on, e.g., timeouts, it runs in the caller thread instead.
+ };
+
+ Queue<Message> answered = new ConcurrentLinkedQueue<>();
+ ReplyHandler handler = reply -> {
+ answered.add(reply.getMessage());
+ latch.countDown();
+ };
+
+ Messenger messenger = new Messenger();
+ messenger.start();
+ Sequencer sequencer = new Sequencer(sender, messenger); // Not using the messenger results in a stack overflow error.
+
+ Queue<Message> sent = new ConcurrentLinkedQueue<>();
+ for (int i = 0; i < 10000; i++) {
+ Message message = new MyMessage(true, 1);
+ message.pushHandler(handler);
+ sequencer.handleMessage(message);
+ sent.add(message);
+ }
+
+ waiting.get().popHandler().handleReply(waiting.get());
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "All messages should obtain a reply within 10s");
+ assertEquals(Set.copyOf(sent), Set.copyOf(answered)); // Order is not guaranteed at all!
+ messenger.destroy();
+ }
+
private static class TestQueue extends LinkedList<Routable> implements ReplyHandler {
void checkReply(boolean hasSeqId, long seqId) {