summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2023-10-13 15:03:36 +0200
committerGitHub <noreply@github.com>2023-10-13 15:03:36 +0200
commit91f93f1a5a0e3d1fd9dae2b9a09bd9a92af762fb (patch)
tree1d0be902f144d36cb45c2716e41fbf711e650d69 /messagebus
parent5c782f4ee1cda44785f7eb5d05f5067d011e177f (diff)
Revert "Balder/deliver reply before sending next in sequence"
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/abi-spec.json1
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java39
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java2
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java1
5 files changed, 11 insertions, 34 deletions
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json
index 093b7f3450a..15a24f82f75 100644
--- a/messagebus/abi-spec.json
+++ b/messagebus/abi-spec.json
@@ -800,7 +800,6 @@
"public"
],
"methods" : [
- "public void <init>(com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.Messenger)",
"public void <init>(com.yahoo.messagebus.MessageHandler)",
"public boolean destroy()",
"public void handleMessage(com.yahoo.messagebus.Message)",
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index eeea999cc14..19b75fcfec4 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -185,8 +185,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
msn.start();
}
- Messenger messenger() { return msn; }
-
/**
* <p>Sets the destroyed flag to true. The very first time this method is
* called, it cleans up all its dependencies. Even if you retain a reference
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
index 2244559b54a..78ddac5398e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java
@@ -20,20 +20,14 @@ public class Sequencer implements MessageHandler, ReplyHandler {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final MessageHandler sender;
private final Map<Long, Queue<Message>> seqMap = new HashMap<>();
- private final Messenger msn;
- private final static ThreadLocal<Boolean> isSending = ThreadLocal.withInitial(() -> Boolean.FALSE);
/**
* Constructs a new sequencer on top of the given async sender.
*
* @param sender The underlying sender.
*/
- public Sequencer(MessageHandler sender, Messenger msn) {
- this.sender = sender;
- this.msn = msn;
- }
public Sequencer(MessageHandler sender) {
- this(sender, null);
+ this.sender = sender;
}
/**
@@ -73,7 +67,11 @@ public class Sequencer implements MessageHandler, ReplyHandler {
msg.setContext(seqId);
synchronized (this) {
if (seqMap.containsKey(seqId)) {
- Queue<Message> queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>());
+ Queue<Message> queue = seqMap.get(seqId);
+ if (queue == null) {
+ queue = new LinkedList<>();
+ seqMap.put(seqId, queue);
+ }
if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
msg.getTrace().trace(TraceLevel.COMPONENT,
"Sequencer queued message with sequence id '" + seqId + "'.");
@@ -139,19 +137,6 @@ public class Sequencer implements MessageHandler, ReplyHandler {
reply.getTrace().trace(TraceLevel.COMPONENT,
"Sequencer received reply with sequence id '" + seqId + "'.");
}
- sendNextInSequence(seqId);
- ReplyHandler handler = reply.popHandler();
- handler.handleReply(reply);
- }
-
- private class SequencedSendTask implements Messenger.Task {
- private final Message msg;
- SequencedSendTask(Message msg) { this.msg = msg; }
- @Override public void run() { sequencedSend(msg); }
- @Override public void destroy() { msg.discard(); }
- }
-
- private void sendNextInSequence(long seqId) {
Message msg = null;
synchronized (this) {
Queue<Message> queue = seqMap.get(seqId);
@@ -162,16 +147,10 @@ public class Sequencer implements MessageHandler, ReplyHandler {
}
}
if (msg != null) {
- Boolean alreadySending = isSending.get();
- if (alreadySending && (msn != null)) {
- // Dispatch in another thread to break possibly very long recursion.
- msn.enqueue(new SequencedSendTask(msg));
- } else {
- isSending.set(Boolean.TRUE);
- sequencedSend(msg);
- }
- isSending.set(Boolean.FALSE);
+ sequencedSend(msg);
}
+ ReplyHandler handler = reply.popHandler();
+ handler.handleReply(reply);
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 75751b1e2ce..3b53e46956b 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -48,7 +48,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
*/
SourceSession(MessageBus mbus, SourceSessionParams params) {
this.mbus = mbus;
- sequencer = new Sequencer(mbus, mbus.messenger());
+ sequencer = new Sequencer(mbus);
if (!params.hasReplyHandler()) {
throw new NullPointerException("Reply handler is null.");
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
index f06ff4f5f73..4f440acec80 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
@@ -102,6 +102,7 @@ public class SequencerTestCase {
assertEquals(0, dst.size());
}
+ @SuppressWarnings("serial")
private static class TestQueue extends LinkedList<Routable> implements ReplyHandler {
void checkReply(boolean hasSeqId, long seqId) {