aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-08-17 12:26:46 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-17 12:26:46 +0200
commitc31ef822e361eddb82f17a1cdd6baa0fa0677ad4 (patch)
tree670e6505d9c2d7d2c38373c91a05598891552061 /messagebus
parent200c31cd18e10fe88ffaf1b1835a81df2a14880f (diff)
Move conditional broadcasting down
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java12
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java10
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java15
3 files changed, 18 insertions, 19 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 818f46bc470..4b674b86aeb 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -279,9 +279,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
IntermediateSession session = new IntermediateSession(this, params);
sessions.put(params.getName(), session);
- if (params.getBroadcastName()) {
- net.registerSession(params.getName(), this);
- }
+ net.registerSession(params.getName(), this, params.getBroadcastName());
return session;
}
@@ -322,9 +320,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
DestinationSession session = new DestinationSession(this, params);
sessions.put(params.getName(), session);
- if (params.getBroadcastName()) {
- net.registerSession(params.getName(), this);
- }
+ net.registerSession(params.getName(), this, params.getBroadcastName());
return session;
}
@@ -337,9 +333,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* @param broadcastName Whether or not session name was broadcast.
*/
public synchronized void unregisterSession(String name, boolean broadcastName) {
- if (broadcastName) {
- net.unregisterSession(name, this);
- }
+ net.unregisterSession(name, this, broadcastName);
sessions.remove(name);
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
index 252952e058c..4ebee093858 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
@@ -50,11 +50,12 @@ public class NetworkMultiplexer implements NetworkOwner {
return new NetworkMultiplexer(net, false);
}
- public void registerSession(String session, NetworkOwner owner) {
+ public void registerSession(String session, NetworkOwner owner, boolean broadcast) {
sessions.compute(session, (name, owners) -> {
if (owners == null) {
owners = new ConcurrentLinkedQueue<>();
- net.registerSession(session);
+ if (broadcast)
+ net.registerSession(session);
}
else if (owners.contains(owner))
throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this");
@@ -64,13 +65,14 @@ public class NetworkMultiplexer implements NetworkOwner {
});
}
- public void unregisterSession(String session, NetworkOwner owner) {
+ public void unregisterSession(String session, NetworkOwner owner, boolean broadcast) {
sessions.compute(session, (name, owners) -> {
if (owners == null || ! owners.remove(owner))
throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'");
if (owners.isEmpty()) {
- net.unregisterSession(session);
+ if (broadcast)
+ net.unregisterSession(session);
return null;
}
return owners;
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
index 808a0e95585..544b4e84899 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
@@ -40,9 +40,9 @@ public class NetworkMultiplexerTest {
assertFalse(net.shutDown.get());
shared.attach(owner1);
- shared.registerSession("s1", owner1);
+ shared.registerSession("s1", owner1, true);
try {
- shared.registerSession("s1", owner1);
+ shared.registerSession("s1", owner1, true);
fail("Illegal to register same session multiple times with the same owner");
}
catch (IllegalArgumentException expected) {
@@ -51,8 +51,9 @@ public class NetworkMultiplexerTest {
assertEquals(Set.of("s1"), net.registered);
shared.attach(owner2);
- shared.registerSession("s1", owner2);
- shared.registerSession("s2", owner2);
+ shared.registerSession("s1", owner2, true);
+ shared.registerSession("s2", owner2, true);
+ shared.registerSession("s3", owner2, false);
assertEquals(Set.of("s1", "s2"), net.registered);
Utf8String name = new Utf8String("protocol");
@@ -65,12 +66,14 @@ public class NetworkMultiplexerTest {
Message message1 = new SimpleMessage("one");
Message message2 = new SimpleMessage("two");
Message message3 = new SimpleMessage("three");
+ Message message4 = new SimpleMessage("four");
shared.deliverMessage(message1, "s1");
shared.deliverMessage(message2, "s2");
- shared.unregisterSession("s1", owner1);
+ shared.unregisterSession("s1", owner1, true);
shared.deliverMessage(message3, "s1");
+ shared.deliverMessage(message4, "s3");
assertEquals(Map.of("s1", List.of(message1)), owner1.messages);
- assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3)), owner2.messages);
+ assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3), "s3", List.of(message4)), owner2.messages);
shared.detach(owner1);
assertEquals(protocol2, shared.getProtocol(name));