summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-10-10 10:32:23 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2023-10-10 10:32:52 +0200
commita2e0b524d650d395a0486b752c21fe251b382436 (patch)
tree95938a89ad5f5893cb76462c8a7b11ebcfa2f54d /messagebus
parentcdac5165107db05d26832b4459a66ae4098b57e3 (diff)
Get help from the messenger thread to break possibly very long recursion.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java25
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java2
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java5
4 files changed, 27 insertions, 7 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 4b911d7c38e..4a54fa5b9c3 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -185,6 +185,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
msn.start();
}
+ Messenger messenger() { return msn; }
+
/**
* <p>Sets the destroyed flag to true. The very first time this method is
* called, it cleans up all its dependencies. Even if you retain a reference
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
index 84ef5800143..4382f6cb798 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
@@ -20,14 +20,18 @@ public class Sequencer implements MessageHandler, ReplyHandler {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final MessageHandler sender;
private final Map<Long, Queue<Message>> seqMap = new HashMap<>();
+ private final Messenger msn;
+ private final static Object BUSY = new Object();
+ private final static ThreadLocal<Object> isSending = new ThreadLocal<>();
/**
* Constructs a new sequencer on top of the given async sender.
*
* @param sender The underlying sender.
*/
- public Sequencer(MessageHandler sender) {
+ public Sequencer(MessageHandler sender, Messenger msn) {
this.sender = sender;
+ this.msn = msn;
}
/**
@@ -133,9 +137,16 @@ public class Sequencer implements MessageHandler, ReplyHandler {
reply.getTrace().trace(TraceLevel.COMPONENT,
"Sequencer received reply with sequence id '" + seqId + "'.");
}
+ sendNextInSequence(seqId);
ReplyHandler handler = reply.popHandler();
handler.handleReply(reply);
- sendNextInSequence(seqId);
+ }
+
+ private class SequencedSendTask implements Messenger.Task {
+ private final Message msg;
+ SequencedSendTask(Message msg) { this.msg = msg; }
+ @Override public void run() { sequencedSend(msg); }
+ @Override public void destroy() { msg.discard(); }
}
private void sendNextInSequence(long seqId) {
@@ -149,7 +160,15 @@ public class Sequencer implements MessageHandler, ReplyHandler {
}
}
if (msg != null) {
- sequencedSend(msg);
+ Object alreadySending = isSending.get();
+ if ((alreadySending == BUSY) && (msn != null)) {
+ // Dispatch in another thread to break possibly very long recursion.
+ msn.enqueue(new SequencedSendTask(msg));
+ } else {
+ isSending.set(BUSY);
+ sequencedSend(msg);
+ }
+ isSending.set(null);
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 53fd49dcf58..2db8c7655e1 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -48,7 +48,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
*/
SourceSession(MessageBus mbus, SourceSessionParams params) {
this.mbus = mbus;
- sequencer = new Sequencer(mbus);
+ sequencer = new Sequencer(mbus, mbus.messenger());
if (!params.hasReplyHandler()) {
throw new NullPointerException("Reply handler is null.");
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
index c3166a578b5..d44bb9235f9 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
@@ -19,7 +19,7 @@ public class SequencerTestCase {
TestQueue src = new TestQueue();
TestQueue dst = new TestQueue();
QueueSender sender = new QueueSender(dst);
- Sequencer seq = new Sequencer(sender);
+ Sequencer seq = new Sequencer(sender, null);
seq.handleMessage(src.createMessage(false, 0));
seq.handleMessage(src.createMessage(false, 0));
@@ -51,7 +51,7 @@ public class SequencerTestCase {
TestQueue src = new TestQueue();
TestQueue dst = new TestQueue();
QueueSender sender = new QueueSender(dst);
- Sequencer seq = new Sequencer(sender);
+ Sequencer seq = new Sequencer(sender, null);
seq.handleMessage(src.createMessage(true, 1L));
seq.handleMessage(src.createMessage(true, 2L));
@@ -102,7 +102,6 @@ public class SequencerTestCase {
assertEquals(0, dst.size());
}
- @SuppressWarnings("serial")
private static class TestQueue extends LinkedList<Routable> implements ReplyHandler {
void checkReply(boolean hasSeqId, long seqId) {