aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java41
1 files changed, 31 insertions, 10 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
index 15bc15f88b8..4e10a72c858 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
@@ -1,4 +1,4 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
import java.util.HashMap;
@@ -20,14 +20,20 @@ public class Sequencer implements MessageHandler, ReplyHandler {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final MessageHandler sender;
private final Map<Long, Queue<Message>> seqMap = new HashMap<>();
+ private final Messenger msn;
+ private final static ThreadLocal<Boolean> isSending = ThreadLocal.withInitial(() -> Boolean.FALSE);
/**
* Constructs a new sequencer on top of the given async sender.
*
* @param sender The underlying sender.
*/
- public Sequencer(MessageHandler sender) {
+ public Sequencer(MessageHandler sender, Messenger msn) {
this.sender = sender;
+ this.msn = msn;
+ }
+ public Sequencer(MessageHandler sender) {
+ this(sender, null);
}
/**
@@ -67,11 +73,7 @@ public class Sequencer implements MessageHandler, ReplyHandler {
msg.setContext(seqId);
synchronized (this) {
if (seqMap.containsKey(seqId)) {
- Queue<Message> queue = seqMap.get(seqId);
- if (queue == null) {
- queue = new LinkedList<>();
- seqMap.put(seqId, queue);
- }
+ Queue<Message> queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>());
if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
msg.getTrace().trace(TraceLevel.COMPONENT,
"Sequencer queued message with sequence id '" + seqId + "'.");
@@ -137,6 +139,19 @@ public class Sequencer implements MessageHandler, ReplyHandler {
reply.getTrace().trace(TraceLevel.COMPONENT,
"Sequencer received reply with sequence id '" + seqId + "'.");
}
+ sendNextInSequence(seqId);
+ ReplyHandler handler = reply.popHandler();
+ handler.handleReply(reply);
+ }
+
+ private class SequencedSendTask implements Messenger.Task {
+ private Message msg;
+ SequencedSendTask(Message msg) { this.msg = msg; }
+ @Override public void run() { sequencedSend(msg); msg = null; }
+ @Override public void destroy() { if (msg != null) msg.discard(); }
+ }
+
+ private void sendNextInSequence(long seqId) {
Message msg = null;
synchronized (this) {
Queue<Message> queue = seqMap.get(seqId);
@@ -147,10 +162,16 @@ public class Sequencer implements MessageHandler, ReplyHandler {
}
}
if (msg != null) {
- sequencedSend(msg);
+ Boolean alreadySending = isSending.get();
+ if (alreadySending && (msn != null)) {
+ // Dispatch in another thread to break possibly very long recursion.
+ msn.enqueue(new SequencedSendTask(msg));
+ } else {
+ isSending.set(Boolean.TRUE);
+ sequencedSend(msg);
+ }
+ isSending.set(Boolean.FALSE);
}
- ReplyHandler handler = reply.popHandler();
- handler.handleReply(reply);
}
}