diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-08-17 12:26:46 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-08-17 12:26:46 +0200 |
commit | c31ef822e361eddb82f17a1cdd6baa0fa0677ad4 (patch) | |
tree | 670e6505d9c2d7d2c38373c91a05598891552061 /messagebus | |
parent | 200c31cd18e10fe88ffaf1b1835a81df2a14880f (diff) |
Move conditional broadcasting down
Diffstat (limited to 'messagebus')
3 files changed, 18 insertions, 19 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 818f46bc470..4b674b86aeb 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -279,9 +279,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } IntermediateSession session = new IntermediateSession(this, params); sessions.put(params.getName(), session); - if (params.getBroadcastName()) { - net.registerSession(params.getName(), this); - } + net.registerSession(params.getName(), this, params.getBroadcastName()); return session; } @@ -322,9 +320,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } DestinationSession session = new DestinationSession(this, params); sessions.put(params.getName(), session); - if (params.getBroadcastName()) { - net.registerSession(params.getName(), this); - } + net.registerSession(params.getName(), this, params.getBroadcastName()); return session; } @@ -337,9 +333,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * @param broadcastName Whether or not session name was broadcast. */ public synchronized void unregisterSession(String name, boolean broadcastName) { - if (broadcastName) { - net.unregisterSession(name, this); - } + net.unregisterSession(name, this, broadcastName); sessions.remove(name); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java index 252952e058c..4ebee093858 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java @@ -50,11 +50,12 @@ public class NetworkMultiplexer implements NetworkOwner { return new NetworkMultiplexer(net, false); } - public void registerSession(String session, NetworkOwner owner) { + public void registerSession(String session, NetworkOwner owner, boolean broadcast) { sessions.compute(session, (name, owners) -> { if (owners == null) { owners = new ConcurrentLinkedQueue<>(); - net.registerSession(session); + if (broadcast) + net.registerSession(session); } else if (owners.contains(owner)) throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this"); @@ -64,13 +65,14 @@ public class NetworkMultiplexer implements NetworkOwner { }); } - public void unregisterSession(String session, NetworkOwner owner) { + public void unregisterSession(String session, NetworkOwner owner, boolean broadcast) { sessions.compute(session, (name, owners) -> { if (owners == null || ! owners.remove(owner)) throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'"); if (owners.isEmpty()) { - net.unregisterSession(session); + if (broadcast) + net.unregisterSession(session); return null; } return owners; diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java index 808a0e95585..544b4e84899 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java @@ -40,9 +40,9 @@ public class NetworkMultiplexerTest { assertFalse(net.shutDown.get()); shared.attach(owner1); - shared.registerSession("s1", owner1); + shared.registerSession("s1", owner1, true); try { - shared.registerSession("s1", owner1); + shared.registerSession("s1", owner1, true); fail("Illegal to register same session multiple times with the same owner"); } catch (IllegalArgumentException expected) { @@ -51,8 +51,9 @@ public class NetworkMultiplexerTest { assertEquals(Set.of("s1"), net.registered); shared.attach(owner2); - shared.registerSession("s1", owner2); - shared.registerSession("s2", owner2); + shared.registerSession("s1", owner2, true); + shared.registerSession("s2", owner2, true); + shared.registerSession("s3", owner2, false); assertEquals(Set.of("s1", "s2"), net.registered); Utf8String name = new Utf8String("protocol"); @@ -65,12 +66,14 @@ public class NetworkMultiplexerTest { Message message1 = new SimpleMessage("one"); Message message2 = new SimpleMessage("two"); Message message3 = new SimpleMessage("three"); + Message message4 = new SimpleMessage("four"); shared.deliverMessage(message1, "s1"); shared.deliverMessage(message2, "s2"); - shared.unregisterSession("s1", owner1); + shared.unregisterSession("s1", owner1, true); shared.deliverMessage(message3, "s1"); + shared.deliverMessage(message4, "s3"); assertEquals(Map.of("s1", List.of(message1)), owner1.messages); - assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3)), owner2.messages); + assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3), "s3", List.of(message4)), owner2.messages); shared.detach(owner1); assertEquals(protocol2, shared.getProtocol(name)); |