From 91f93f1a5a0e3d1fd9dae2b9a09bd9a92af762fb Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 13 Oct 2023 15:03:36 +0200 Subject: Revert "Balder/deliver reply before sending next in sequence" --- messagebus/abi-spec.json | 1 - .../main/java/com/yahoo/messagebus/MessageBus.java | 2 -- .../main/java/com/yahoo/messagebus/Sequencer.java | 39 +++++----------------- .../java/com/yahoo/messagebus/SourceSession.java | 2 +- .../com/yahoo/messagebus/SequencerTestCase.java | 1 + 5 files changed, 11 insertions(+), 34 deletions(-) (limited to 'messagebus') diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json index 093b7f3450a..15a24f82f75 100644 --- a/messagebus/abi-spec.json +++ b/messagebus/abi-spec.json @@ -800,7 +800,6 @@ "public" ], "methods" : [ - "public void (com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.Messenger)", "public void (com.yahoo.messagebus.MessageHandler)", "public boolean destroy()", "public void handleMessage(com.yahoo.messagebus.Message)", diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index eeea999cc14..19b75fcfec4 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -185,8 +185,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, msn.start(); } - Messenger messenger() { return msn; } - /** *

Sets the destroyed flag to true. The very first time this method is * called, it cleans up all its dependencies. Even if you retain a reference diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java index 2244559b54a..78ddac5398e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java @@ -20,20 +20,14 @@ public class Sequencer implements MessageHandler, ReplyHandler { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final MessageHandler sender; private final Map> seqMap = new HashMap<>(); - private final Messenger msn; - private final static ThreadLocal isSending = ThreadLocal.withInitial(() -> Boolean.FALSE); /** * Constructs a new sequencer on top of the given async sender. * * @param sender The underlying sender. */ - public Sequencer(MessageHandler sender, Messenger msn) { - this.sender = sender; - this.msn = msn; - } public Sequencer(MessageHandler sender) { - this(sender, null); + this.sender = sender; } /** @@ -73,7 +67,11 @@ public class Sequencer implements MessageHandler, ReplyHandler { msg.setContext(seqId); synchronized (this) { if (seqMap.containsKey(seqId)) { - Queue queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>()); + Queue queue = seqMap.get(seqId); + if (queue == null) { + queue = new LinkedList<>(); + seqMap.put(seqId, queue); + } if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) { msg.getTrace().trace(TraceLevel.COMPONENT, "Sequencer queued message with sequence id '" + seqId + "'."); @@ -139,19 +137,6 @@ public class Sequencer implements MessageHandler, ReplyHandler { reply.getTrace().trace(TraceLevel.COMPONENT, "Sequencer received reply with sequence id '" + seqId + "'."); } - sendNextInSequence(seqId); - ReplyHandler handler = reply.popHandler(); - handler.handleReply(reply); - } - - private class SequencedSendTask implements Messenger.Task { - private final Message msg; - SequencedSendTask(Message msg) { this.msg = msg; } - @Override public void run() { sequencedSend(msg); } - @Override public void destroy() { msg.discard(); } - } - - private void sendNextInSequence(long seqId) { Message msg = null; synchronized (this) { Queue queue = seqMap.get(seqId); @@ -162,16 +147,10 @@ public class Sequencer implements MessageHandler, ReplyHandler { } } if (msg != null) { - Boolean alreadySending = isSending.get(); - if (alreadySending && (msn != null)) { - // Dispatch in another thread to break possibly very long recursion. - msn.enqueue(new SequencedSendTask(msg)); - } else { - isSending.set(Boolean.TRUE); - sequencedSend(msg); - } - isSending.set(Boolean.FALSE); + sequencedSend(msg); } + ReplyHandler handler = reply.popHandler(); + handler.handleReply(reply); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 75751b1e2ce..3b53e46956b 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -48,7 +48,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked */ SourceSession(MessageBus mbus, SourceSessionParams params) { this.mbus = mbus; - sequencer = new Sequencer(mbus, mbus.messenger()); + sequencer = new Sequencer(mbus); if (!params.hasReplyHandler()) { throw new NullPointerException("Reply handler is null."); } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java index f06ff4f5f73..4f440acec80 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -102,6 +102,7 @@ public class SequencerTestCase { assertEquals(0, dst.size()); } + @SuppressWarnings("serial") private static class TestQueue extends LinkedList implements ReplyHandler { void checkReply(boolean hasSeqId, long seqId) { -- cgit v1.2.3