diff options
3 files changed, 18 insertions, 19 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 818f46bc470..4b674b86aeb 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -279,9 +279,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } IntermediateSession session = new IntermediateSession(this, params); sessions.put(params.getName(), session); - if (params.getBroadcastName()) { - net.registerSession(params.getName(), this); - } + net.registerSession(params.getName(), this, params.getBroadcastName()); return session; } @@ -322,9 +320,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } DestinationSession session = new DestinationSession(this, params); sessions.put(params.getName(), session); - if (params.getBroadcastName()) { - net.registerSession(params.getName(), this); - } + net.registerSession(params.getName(), this, params.getBroadcastName()); return session; } @@ -337,9 +333,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * @param broadcastName Whether or not session name was broadcast. */ public synchronized void unregisterSession(String name, boolean broadcastName) { - if (broadcastName) { - net.unregisterSession(name, this); - } + net.unregisterSession(name, this, broadcastName); sessions.remove(name); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java index 252952e058c..4ebee093858 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java @@ -50,11 +50,12 @@ public class NetworkMultiplexer implements NetworkOwner { return new NetworkMultiplexer(net, false); } - public void registerSession(String session, NetworkOwner owner) { + public void registerSession(String session, NetworkOwner owner, boolean broadcast) { sessions.compute(session, (name, owners) -> { if (owners == null) { owners = new ConcurrentLinkedQueue<>(); - net.registerSession(session); + if (broadcast) + net.registerSession(session); } else if (owners.contains(owner)) throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this"); @@ -64,13 +65,14 @@ public class NetworkMultiplexer implements NetworkOwner { }); } - public void unregisterSession(String session, NetworkOwner owner) { + public void unregisterSession(String session, NetworkOwner owner, boolean broadcast) { sessions.compute(session, (name, owners) -> { if (owners == null || ! owners.remove(owner)) throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'"); if (owners.isEmpty()) { - net.unregisterSession(session); + if (broadcast) + net.unregisterSession(session); return null; } return owners; diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java index 808a0e95585..544b4e84899 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java @@ -40,9 +40,9 @@ public class NetworkMultiplexerTest { assertFalse(net.shutDown.get()); shared.attach(owner1); - shared.registerSession("s1", owner1); + shared.registerSession("s1", owner1, true); try { - shared.registerSession("s1", owner1); + shared.registerSession("s1", owner1, true); fail("Illegal to register same session multiple times with the same owner"); } catch (IllegalArgumentException expected) { @@ -51,8 +51,9 @@ public class NetworkMultiplexerTest { assertEquals(Set.of("s1"), net.registered); shared.attach(owner2); - shared.registerSession("s1", owner2); - shared.registerSession("s2", owner2); + shared.registerSession("s1", owner2, true); + shared.registerSession("s2", owner2, true); + shared.registerSession("s3", owner2, false); assertEquals(Set.of("s1", "s2"), net.registered); Utf8String name = new Utf8String("protocol"); @@ -65,12 +66,14 @@ public class NetworkMultiplexerTest { Message message1 = new SimpleMessage("one"); Message message2 = new SimpleMessage("two"); Message message3 = new SimpleMessage("three"); + Message message4 = new SimpleMessage("four"); shared.deliverMessage(message1, "s1"); shared.deliverMessage(message2, "s2"); - shared.unregisterSession("s1", owner1); + shared.unregisterSession("s1", owner1, true); shared.deliverMessage(message3, "s1"); + shared.deliverMessage(message4, "s3"); assertEquals(Map.of("s1", List.of(message1)), owner1.messages); - assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3)), owner2.messages); + assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3), "s3", List.of(message4)), owner2.messages); shared.detach(owner1); assertEquals(protocol2, shared.getProtocol(name)); |