diff options
author | jonmv <venstad@gmail.com> | 2023-10-13 17:24:57 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-10-13 17:24:57 +0200 |
commit | 3741bd8da83e2b2b27fd0f746689bb11464576f3 (patch) | |
tree | d56cabea5ce23d58e7915603bf7ac644d711f893 | |
parent | 33ca0c01d65ba52ed0b508912069fee9db79dfcc (diff) |
Revert "Merge pull request #28922 from vespa-engine/revert-28846-balder/deliver-reply-before-sending-next-in-sequence"
This reverts commit 33ca0c01d65ba52ed0b508912069fee9db79dfcc, reversing
changes made to a4515aed1c078f2d521c3eb8de2d327dfa336644.
5 files changed, 34 insertions, 11 deletions
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json index 15a24f82f75..093b7f3450a 100644 --- a/messagebus/abi-spec.json +++ b/messagebus/abi-spec.json @@ -800,6 +800,7 @@ "public" ], "methods" : [ + "public void <init>(com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.Messenger)", "public void <init>(com.yahoo.messagebus.MessageHandler)", "public boolean destroy()", "public void handleMessage(com.yahoo.messagebus.Message)", diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 19b75fcfec4..eeea999cc14 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -185,6 +185,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, msn.start(); } + Messenger messenger() { return msn; } + /** * <p>Sets the destroyed flag to true. The very first time this method is * called, it cleans up all its dependencies. Even if you retain a reference diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java index 78ddac5398e..2244559b54a 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java @@ -20,14 +20,20 @@ public class Sequencer implements MessageHandler, ReplyHandler { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final MessageHandler sender; private final Map<Long, Queue<Message>> seqMap = new HashMap<>(); + private final Messenger msn; + private final static ThreadLocal<Boolean> isSending = ThreadLocal.withInitial(() -> Boolean.FALSE); /** * Constructs a new sequencer on top of the given async sender. * * @param sender The underlying sender. */ - public Sequencer(MessageHandler sender) { + public Sequencer(MessageHandler sender, Messenger msn) { this.sender = sender; + this.msn = msn; + } + public Sequencer(MessageHandler sender) { + this(sender, null); } /** @@ -67,11 +73,7 @@ public class Sequencer implements MessageHandler, ReplyHandler { msg.setContext(seqId); synchronized (this) { if (seqMap.containsKey(seqId)) { - Queue<Message> queue = seqMap.get(seqId); - if (queue == null) { - queue = new LinkedList<>(); - seqMap.put(seqId, queue); - } + Queue<Message> queue = seqMap.computeIfAbsent(seqId, k -> new LinkedList<>()); if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) { msg.getTrace().trace(TraceLevel.COMPONENT, "Sequencer queued message with sequence id '" + seqId + "'."); @@ -137,6 +139,19 @@ public class Sequencer implements MessageHandler, ReplyHandler { reply.getTrace().trace(TraceLevel.COMPONENT, "Sequencer received reply with sequence id '" + seqId + "'."); } + sendNextInSequence(seqId); + ReplyHandler handler = reply.popHandler(); + handler.handleReply(reply); + } + + private class SequencedSendTask implements Messenger.Task { + private final Message msg; + SequencedSendTask(Message msg) { this.msg = msg; } + @Override public void run() { sequencedSend(msg); } + @Override public void destroy() { msg.discard(); } + } + + private void sendNextInSequence(long seqId) { Message msg = null; synchronized (this) { Queue<Message> queue = seqMap.get(seqId); @@ -147,10 +162,16 @@ public class Sequencer implements MessageHandler, ReplyHandler { } } if (msg != null) { - sequencedSend(msg); + Boolean alreadySending = isSending.get(); + if (alreadySending && (msn != null)) { + // Dispatch in another thread to break possibly very long recursion. + msn.enqueue(new SequencedSendTask(msg)); + } else { + isSending.set(Boolean.TRUE); + sequencedSend(msg); + } + isSending.set(Boolean.FALSE); } - ReplyHandler handler = reply.popHandler(); - handler.handleReply(reply); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 3b53e46956b..75751b1e2ce 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -48,7 +48,7 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked */ SourceSession(MessageBus mbus, SourceSessionParams params) { this.mbus = mbus; - sequencer = new Sequencer(mbus); + sequencer = new Sequencer(mbus, mbus.messenger()); if (!params.hasReplyHandler()) { throw new NullPointerException("Reply handler is null."); } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java index 4f440acec80..f06ff4f5f73 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -102,7 +102,6 @@ public class SequencerTestCase { assertEquals(0, dst.size()); } - @SuppressWarnings("serial") private static class TestQueue extends LinkedList<Routable> implements ReplyHandler { void checkReply(boolean hasSeqId, long seqId) { |