diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-08-23 10:14:03 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-08-23 10:14:03 +0200 |
commit | 26d36def2b74166bda5000660ad8a9467f57e728 (patch) | |
tree | 59ba2afaaecdf612c1aed82aef153eb3b9f6ead6 /messagebus | |
parent | 43402b23dc74fe6550d231b26e8c50327e1971f9 (diff) |
Let server session.connect() conncet to net, and use newest destination
No practical changes, but less API misuse. @bjorncs please review.
Diffstat (limited to 'messagebus')
7 files changed, 80 insertions, 18 deletions
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json index 85c1eaa4f09..623904bef8d 100644 --- a/messagebus/abi-spec.json +++ b/messagebus/abi-spec.json @@ -64,9 +64,23 @@ ], "fields": [] }, + "com.yahoo.messagebus.Connectable": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "interface", + "abstract" + ], + "methods": [ + "public abstract void connect()" + ], + "fields": [] + }, "com.yahoo.messagebus.DestinationSession": { "superClass": "java.lang.Object", "interfaces": [ + "com.yahoo.messagebus.Connectable", "com.yahoo.messagebus.MessageHandler" ], "attributes": [ @@ -81,7 +95,8 @@ "public com.yahoo.messagebus.MessageHandler getMessageHandler()", "public java.lang.String getConnectionSpec()", "public java.lang.String getName()", - "public void handleMessage(com.yahoo.messagebus.Message)" + "public void handleMessage(com.yahoo.messagebus.Message)", + "public void connect()" ], "fields": [] }, @@ -213,7 +228,8 @@ "superClass": "java.lang.Object", "interfaces": [ "com.yahoo.messagebus.MessageHandler", - "com.yahoo.messagebus.ReplyHandler" + "com.yahoo.messagebus.ReplyHandler", + "com.yahoo.messagebus.Connectable" ], "attributes": [ "public", @@ -228,7 +244,8 @@ "public java.lang.String getConnectionSpec()", "public java.lang.String getName()", "public void handleMessage(com.yahoo.messagebus.Message)", - "public void handleReply(com.yahoo.messagebus.Reply)" + "public void handleReply(com.yahoo.messagebus.Reply)", + "public void connect()" ], "fields": [] }, @@ -319,8 +336,11 @@ "public com.yahoo.messagebus.SourceSession createSourceSession(com.yahoo.messagebus.SourceSessionParams)", "public com.yahoo.messagebus.IntermediateSession createIntermediateSession(java.lang.String, boolean, com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.ReplyHandler)", "public synchronized com.yahoo.messagebus.IntermediateSession createIntermediateSession(com.yahoo.messagebus.IntermediateSessionParams)", + "public synchronized com.yahoo.messagebus.IntermediateSession createDetachedIntermediateSession(com.yahoo.messagebus.IntermediateSessionParams)", "public com.yahoo.messagebus.DestinationSession createDestinationSession(java.lang.String, boolean, com.yahoo.messagebus.MessageHandler)", "public synchronized com.yahoo.messagebus.DestinationSession createDestinationSession(com.yahoo.messagebus.DestinationSessionParams)", + "public synchronized com.yahoo.messagebus.DestinationSession createDetachedDestinationSession(com.yahoo.messagebus.DestinationSessionParams)", + "public void connect(java.lang.String, boolean)", "public synchronized void unregisterSession(java.lang.String, boolean)", "public void handleMessage(com.yahoo.messagebus.Message)", "public void handleReply(com.yahoo.messagebus.Reply)", diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java b/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java new file mode 100644 index 00000000000..40629859d5c --- /dev/null +++ b/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java @@ -0,0 +1,13 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +/** + * Something which can be connected to a network when ready to receive incoming requests. + * + * @author jonmv + */ +public interface Connectable { + + void connect(); + +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java index a4b5422d502..484305decdc 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * * @author Simon Thoresen Hult */ -public final class DestinationSession implements MessageHandler { +public final class DestinationSession implements Connectable, MessageHandler { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final String name; @@ -125,4 +125,9 @@ public final class DestinationSession implements MessageHandler { msgHandler.handleMessage(msg); } + @Override + public void connect() { + mbus.connect(name, broadcastName); + } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java index ca772ce6c3a..44215692ca4 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * * @author Simon Thoresen Hult */ -public final class IntermediateSession implements MessageHandler, ReplyHandler { +public final class IntermediateSession implements MessageHandler, ReplyHandler, Connectable { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final String name; @@ -114,4 +114,9 @@ public final class IntermediateSession implements MessageHandler, ReplyHandler { } } + @Override + public void connect() { + mbus.connect(name, broadcastName); + } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 4b674b86aeb..313a5dccdaf 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -271,6 +271,12 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * @return The created session. */ public synchronized IntermediateSession createIntermediateSession(IntermediateSessionParams params) { + IntermediateSession session = createDetachedIntermediateSession(params); + connect(params.getName(), params.getBroadcastName()); + return session; + } + + public synchronized IntermediateSession createDetachedIntermediateSession(IntermediateSessionParams params) { if (destroyed.get()) { throw new IllegalStateException("Object is destroyed."); } @@ -279,7 +285,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } IntermediateSession session = new IntermediateSession(this, params); sessions.put(params.getName(), session); - net.registerSession(params.getName(), this, params.getBroadcastName()); return session; } @@ -312,6 +317,12 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * @return The created session. */ public synchronized DestinationSession createDestinationSession(DestinationSessionParams params) { + DestinationSession session = createDetachedDestinationSession(params); + connect(params.getName(), params.getBroadcastName()); + return session; + } + + public synchronized DestinationSession createDetachedDestinationSession(DestinationSessionParams params) { if (destroyed.get()) { throw new IllegalStateException("Object is destroyed."); } @@ -320,10 +331,14 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } DestinationSession session = new DestinationSession(this, params); sessions.put(params.getName(), session); - net.registerSession(params.getName(), this, params.getBroadcastName()); return session; } + /** Connects the given session to the network, so it will receive requests. */ + public void connect(String session, boolean broadcast) { + net.registerSession(session, this, broadcast); + } + /** * <p>This method is invoked by the {@link * com.yahoo.messagebus.IntermediateSession#destroy()} to unregister diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java index 5e6a8c7688e..e75b5d0934a 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java @@ -5,9 +5,11 @@ import com.yahoo.messagebus.Message; import com.yahoo.messagebus.Protocol; import com.yahoo.text.Utf8Array; +import java.util.Deque; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Logger; @@ -29,8 +31,8 @@ public class NetworkMultiplexer implements NetworkOwner { private static final Logger log = Logger.getLogger(NetworkMultiplexer.class.getName()); private final Network net; - private final Queue<NetworkOwner> owners = new ConcurrentLinkedQueue<>(); - private final Map<String, Queue<NetworkOwner>> sessions = new ConcurrentHashMap<>(); + private final Deque<NetworkOwner> owners = new ConcurrentLinkedDeque<>(); + private final Map<String, Deque<NetworkOwner>> sessions = new ConcurrentHashMap<>(); private final boolean shared; private NetworkMultiplexer(Network net, boolean shared) { @@ -52,23 +54,21 @@ public class NetworkMultiplexer implements NetworkOwner { public void registerSession(String session, NetworkOwner owner, boolean broadcast) { sessions.compute(session, (name, owners) -> { if (owners == null) { - owners = new ConcurrentLinkedQueue<>(); + owners = new ConcurrentLinkedDeque<>(); if (broadcast) net.registerSession(session); } else if (owners.contains(owner)) throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this"); - owners.add(owner); + owners.push(owner); return owners; }); } public void unregisterSession(String session, NetworkOwner owner, boolean broadcast) { - sessions.compute(session, (name, owners) -> { - if (owners == null || ! owners.remove(owner)) - throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'"); - + sessions.computeIfPresent(session, (name, owners) -> { + owners.remove(owner); if (owners.isEmpty()) { if (broadcast) net.unregisterSession(session); diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java index f1d83550601..3bf754f29c7 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java @@ -51,7 +51,6 @@ public class NetworkMultiplexerTest { assertEquals(Set.of("s1"), net.registered); shared.attach(owner2); - shared.registerSession("s1", owner2, true); shared.registerSession("s2", owner2, true); shared.registerSession("s3", owner2, false); assertEquals(Set.of("s1", "s2"), net.registered); @@ -68,13 +67,18 @@ public class NetworkMultiplexerTest { Message message2 = new SimpleMessage("two"); Message message3 = new SimpleMessage("three"); Message message4 = new SimpleMessage("four"); + Message message5 = new SimpleMessage("five"); shared.deliverMessage(message1, "s1"); shared.deliverMessage(message2, "s2"); - shared.unregisterSession("s1", owner1, true); + + // New "s1" owner connects, and should have new requests. + shared.registerSession("s1", owner2, true); shared.deliverMessage(message3, "s1"); shared.deliverMessage(message4, "s3"); + shared.unregisterSession("s1", owner1, true); + shared.deliverMessage(message5, "s1"); assertEquals(Map.of("s1", List.of(message1)), owner1.messages); - assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3), "s3", List.of(message4)), owner2.messages); + assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3, message5), "s3", List.of(message4)), owner2.messages); shared.detach(owner1); assertEquals(protocol2, shared.getProtocol(name)); |