diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-08-23 10:14:03 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-08-23 10:14:03 +0200 |
commit | 26d36def2b74166bda5000660ad8a9467f57e728 (patch) | |
tree | 59ba2afaaecdf612c1aed82aef153eb3b9f6ead6 | |
parent | 43402b23dc74fe6550d231b26e8c50327e1971f9 (diff) |
Let server session.connect() conncet to net, and use newest destination
No practical changes, but less API misuse. @bjorncs please review.
12 files changed, 91 insertions, 30 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java index a2131f22dc4..67badddddd2 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java @@ -43,6 +43,7 @@ public final class MbusServer extends AbstractResource implements ServerProvider @Override public void start() { log.log(Level.FINE, "Starting message bus server."); + session.connect(); running.set(true); } @@ -50,7 +51,6 @@ public final class MbusServer extends AbstractResource implements ServerProvider public void close() { log.log(Level.FINE, "Closing message bus server."); running.set(false); - session.close(); } @Override diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java index 4cd4a776292..9bab25ae87d 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java @@ -2,13 +2,14 @@ package com.yahoo.messagebus.shared; import com.yahoo.jdisc.SharedResource; +import com.yahoo.messagebus.Connectable; import com.yahoo.messagebus.MessageHandler; import com.yahoo.messagebus.Reply; /** * @author Simon Thoresen Hult */ -public interface ServerSession extends SharedResource { +public interface ServerSession extends SharedResource, Connectable { MessageHandler getMessageHandler(); @@ -20,6 +21,4 @@ public interface ServerSession extends SharedResource { String name(); - void close(); - } diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java index 5a9cd39c5b4..7a39005cd48 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java @@ -28,7 +28,7 @@ public class SharedDestinationSession extends AbstractResource implements Messag SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) { this.msgHandler.set(params.getMessageHandler()); - this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this)); + this.session = mbus.messageBus().createDetachedDestinationSession(params.setMessageHandler(this)); this.mbusReference = mbus.refer(); } @@ -77,8 +77,8 @@ public class SharedDestinationSession extends AbstractResource implements Messag } @Override - public void close() { - session.destroy(); + public void connect() { + session.connect(); } @Override diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java index 64cc1aaf510..cb35a0a42f1 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java @@ -35,8 +35,8 @@ public class SharedIntermediateSession extends AbstractResource throw new IllegalArgumentException("Reply handler must be null."); } this.msgHandler.set(params.getMessageHandler()); - this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this) - .setMessageHandler(this)); + this.session = mbus.messageBus().createDetachedIntermediateSession(params.setReplyHandler(this) + .setMessageHandler(this)); this.mbusReference = mbus.refer(); } @@ -96,8 +96,8 @@ public class SharedIntermediateSession extends AbstractResource } @Override - public void close() { - session.destroy(); + public void connect() { + session.connect(); } @Override diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java index 6ebb41c4ab7..a251cddcbdf 100644 --- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java @@ -352,7 +352,7 @@ public class MbusServerTestCase { } @Override - public void close() { } + public void connect() { } @Override public ResourceReference refer() { diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json index 85c1eaa4f09..623904bef8d 100644 --- a/messagebus/abi-spec.json +++ b/messagebus/abi-spec.json @@ -64,9 +64,23 @@ ], "fields": [] }, + "com.yahoo.messagebus.Connectable": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "interface", + "abstract" + ], + "methods": [ + "public abstract void connect()" + ], + "fields": [] + }, "com.yahoo.messagebus.DestinationSession": { "superClass": "java.lang.Object", "interfaces": [ + "com.yahoo.messagebus.Connectable", "com.yahoo.messagebus.MessageHandler" ], "attributes": [ @@ -81,7 +95,8 @@ "public com.yahoo.messagebus.MessageHandler getMessageHandler()", "public java.lang.String getConnectionSpec()", "public java.lang.String getName()", - "public void handleMessage(com.yahoo.messagebus.Message)" + "public void handleMessage(com.yahoo.messagebus.Message)", + "public void connect()" ], "fields": [] }, @@ -213,7 +228,8 @@ "superClass": "java.lang.Object", "interfaces": [ "com.yahoo.messagebus.MessageHandler", - "com.yahoo.messagebus.ReplyHandler" + "com.yahoo.messagebus.ReplyHandler", + "com.yahoo.messagebus.Connectable" ], "attributes": [ "public", @@ -228,7 +244,8 @@ "public java.lang.String getConnectionSpec()", "public java.lang.String getName()", "public void handleMessage(com.yahoo.messagebus.Message)", - "public void handleReply(com.yahoo.messagebus.Reply)" + "public void handleReply(com.yahoo.messagebus.Reply)", + "public void connect()" ], "fields": [] }, @@ -319,8 +336,11 @@ "public com.yahoo.messagebus.SourceSession createSourceSession(com.yahoo.messagebus.SourceSessionParams)", "public com.yahoo.messagebus.IntermediateSession createIntermediateSession(java.lang.String, boolean, com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.ReplyHandler)", "public synchronized com.yahoo.messagebus.IntermediateSession createIntermediateSession(com.yahoo.messagebus.IntermediateSessionParams)", + "public synchronized com.yahoo.messagebus.IntermediateSession createDetachedIntermediateSession(com.yahoo.messagebus.IntermediateSessionParams)", "public com.yahoo.messagebus.DestinationSession createDestinationSession(java.lang.String, boolean, com.yahoo.messagebus.MessageHandler)", "public synchronized com.yahoo.messagebus.DestinationSession createDestinationSession(com.yahoo.messagebus.DestinationSessionParams)", + "public synchronized com.yahoo.messagebus.DestinationSession createDetachedDestinationSession(com.yahoo.messagebus.DestinationSessionParams)", + "public void connect(java.lang.String, boolean)", "public synchronized void unregisterSession(java.lang.String, boolean)", "public void handleMessage(com.yahoo.messagebus.Message)", "public void handleReply(com.yahoo.messagebus.Reply)", diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java b/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java new file mode 100644 index 00000000000..40629859d5c --- /dev/null +++ b/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java @@ -0,0 +1,13 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +/** + * Something which can be connected to a network when ready to receive incoming requests. + * + * @author jonmv + */ +public interface Connectable { + + void connect(); + +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java index a4b5422d502..484305decdc 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * * @author Simon Thoresen Hult */ -public final class DestinationSession implements MessageHandler { +public final class DestinationSession implements Connectable, MessageHandler { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final String name; @@ -125,4 +125,9 @@ public final class DestinationSession implements MessageHandler { msgHandler.handleMessage(msg); } + @Override + public void connect() { + mbus.connect(name, broadcastName); + } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java index ca772ce6c3a..44215692ca4 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * * @author Simon Thoresen Hult */ -public final class IntermediateSession implements MessageHandler, ReplyHandler { +public final class IntermediateSession implements MessageHandler, ReplyHandler, Connectable { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final String name; @@ -114,4 +114,9 @@ public final class IntermediateSession implements MessageHandler, ReplyHandler { } } + @Override + public void connect() { + mbus.connect(name, broadcastName); + } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 4b674b86aeb..313a5dccdaf 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -271,6 +271,12 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * @return The created session. */ public synchronized IntermediateSession createIntermediateSession(IntermediateSessionParams params) { + IntermediateSession session = createDetachedIntermediateSession(params); + connect(params.getName(), params.getBroadcastName()); + return session; + } + + public synchronized IntermediateSession createDetachedIntermediateSession(IntermediateSessionParams params) { if (destroyed.get()) { throw new IllegalStateException("Object is destroyed."); } @@ -279,7 +285,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } IntermediateSession session = new IntermediateSession(this, params); sessions.put(params.getName(), session); - net.registerSession(params.getName(), this, params.getBroadcastName()); return session; } @@ -312,6 +317,12 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * @return The created session. */ public synchronized DestinationSession createDestinationSession(DestinationSessionParams params) { + DestinationSession session = createDetachedDestinationSession(params); + connect(params.getName(), params.getBroadcastName()); + return session; + } + + public synchronized DestinationSession createDetachedDestinationSession(DestinationSessionParams params) { if (destroyed.get()) { throw new IllegalStateException("Object is destroyed."); } @@ -320,10 +331,14 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, } DestinationSession session = new DestinationSession(this, params); sessions.put(params.getName(), session); - net.registerSession(params.getName(), this, params.getBroadcastName()); return session; } + /** Connects the given session to the network, so it will receive requests. */ + public void connect(String session, boolean broadcast) { + net.registerSession(session, this, broadcast); + } + /** * <p>This method is invoked by the {@link * com.yahoo.messagebus.IntermediateSession#destroy()} to unregister diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java index 5e6a8c7688e..e75b5d0934a 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java @@ -5,9 +5,11 @@ import com.yahoo.messagebus.Message; import com.yahoo.messagebus.Protocol; import com.yahoo.text.Utf8Array; +import java.util.Deque; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Logger; @@ -29,8 +31,8 @@ public class NetworkMultiplexer implements NetworkOwner { private static final Logger log = Logger.getLogger(NetworkMultiplexer.class.getName()); private final Network net; - private final Queue<NetworkOwner> owners = new ConcurrentLinkedQueue<>(); - private final Map<String, Queue<NetworkOwner>> sessions = new ConcurrentHashMap<>(); + private final Deque<NetworkOwner> owners = new ConcurrentLinkedDeque<>(); + private final Map<String, Deque<NetworkOwner>> sessions = new ConcurrentHashMap<>(); private final boolean shared; private NetworkMultiplexer(Network net, boolean shared) { @@ -52,23 +54,21 @@ public class NetworkMultiplexer implements NetworkOwner { public void registerSession(String session, NetworkOwner owner, boolean broadcast) { sessions.compute(session, (name, owners) -> { if (owners == null) { - owners = new ConcurrentLinkedQueue<>(); + owners = new ConcurrentLinkedDeque<>(); if (broadcast) net.registerSession(session); } else if (owners.contains(owner)) throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this"); - owners.add(owner); + owners.push(owner); return owners; }); } public void unregisterSession(String session, NetworkOwner owner, boolean broadcast) { - sessions.compute(session, (name, owners) -> { - if (owners == null || ! owners.remove(owner)) - throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'"); - + sessions.computeIfPresent(session, (name, owners) -> { + owners.remove(owner); if (owners.isEmpty()) { if (broadcast) net.unregisterSession(session); diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java index f1d83550601..3bf754f29c7 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java @@ -51,7 +51,6 @@ public class NetworkMultiplexerTest { assertEquals(Set.of("s1"), net.registered); shared.attach(owner2); - shared.registerSession("s1", owner2, true); shared.registerSession("s2", owner2, true); shared.registerSession("s3", owner2, false); assertEquals(Set.of("s1", "s2"), net.registered); @@ -68,13 +67,18 @@ public class NetworkMultiplexerTest { Message message2 = new SimpleMessage("two"); Message message3 = new SimpleMessage("three"); Message message4 = new SimpleMessage("four"); + Message message5 = new SimpleMessage("five"); shared.deliverMessage(message1, "s1"); shared.deliverMessage(message2, "s2"); - shared.unregisterSession("s1", owner1, true); + + // New "s1" owner connects, and should have new requests. + shared.registerSession("s1", owner2, true); shared.deliverMessage(message3, "s1"); shared.deliverMessage(message4, "s3"); + shared.unregisterSession("s1", owner1, true); + shared.deliverMessage(message5, "s1"); assertEquals(Map.of("s1", List.of(message1)), owner1.messages); - assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3), "s3", List.of(message4)), owner2.messages); + assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3, message5), "s3", List.of(message4)), owner2.messages); shared.detach(owner1); assertEquals(protocol2, shared.getProtocol(name)); |