summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java2
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java6
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java8
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java2
-rw-r--r--messagebus/abi-spec.json26
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Connectable.java13
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java7
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java7
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java19
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java16
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java10
12 files changed, 91 insertions, 30 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
index a2131f22dc4..67badddddd2 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
@@ -43,6 +43,7 @@ public final class MbusServer extends AbstractResource implements ServerProvider
@Override
public void start() {
log.log(Level.FINE, "Starting message bus server.");
+ session.connect();
running.set(true);
}
@@ -50,7 +51,6 @@ public final class MbusServer extends AbstractResource implements ServerProvider
public void close() {
log.log(Level.FINE, "Closing message bus server.");
running.set(false);
- session.close();
}
@Override
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
index 4cd4a776292..9bab25ae87d 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
@@ -2,13 +2,14 @@
package com.yahoo.messagebus.shared;
import com.yahoo.jdisc.SharedResource;
+import com.yahoo.messagebus.Connectable;
import com.yahoo.messagebus.MessageHandler;
import com.yahoo.messagebus.Reply;
/**
* @author Simon Thoresen Hult
*/
-public interface ServerSession extends SharedResource {
+public interface ServerSession extends SharedResource, Connectable {
MessageHandler getMessageHandler();
@@ -20,6 +21,4 @@ public interface ServerSession extends SharedResource {
String name();
- void close();
-
}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
index 5a9cd39c5b4..7a39005cd48 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
@@ -28,7 +28,7 @@ public class SharedDestinationSession extends AbstractResource implements Messag
SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) {
this.msgHandler.set(params.getMessageHandler());
- this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this));
+ this.session = mbus.messageBus().createDetachedDestinationSession(params.setMessageHandler(this));
this.mbusReference = mbus.refer();
}
@@ -77,8 +77,8 @@ public class SharedDestinationSession extends AbstractResource implements Messag
}
@Override
- public void close() {
- session.destroy();
+ public void connect() {
+ session.connect();
}
@Override
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
index 64cc1aaf510..cb35a0a42f1 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
@@ -35,8 +35,8 @@ public class SharedIntermediateSession extends AbstractResource
throw new IllegalArgumentException("Reply handler must be null.");
}
this.msgHandler.set(params.getMessageHandler());
- this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this)
- .setMessageHandler(this));
+ this.session = mbus.messageBus().createDetachedIntermediateSession(params.setReplyHandler(this)
+ .setMessageHandler(this));
this.mbusReference = mbus.refer();
}
@@ -96,8 +96,8 @@ public class SharedIntermediateSession extends AbstractResource
}
@Override
- public void close() {
- session.destroy();
+ public void connect() {
+ session.connect();
}
@Override
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
index 6ebb41c4ab7..a251cddcbdf 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
@@ -352,7 +352,7 @@ public class MbusServerTestCase {
}
@Override
- public void close() { }
+ public void connect() { }
@Override
public ResourceReference refer() {
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json
index 85c1eaa4f09..623904bef8d 100644
--- a/messagebus/abi-spec.json
+++ b/messagebus/abi-spec.json
@@ -64,9 +64,23 @@
],
"fields": []
},
+ "com.yahoo.messagebus.Connectable": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract void connect()"
+ ],
+ "fields": []
+ },
"com.yahoo.messagebus.DestinationSession": {
"superClass": "java.lang.Object",
"interfaces": [
+ "com.yahoo.messagebus.Connectable",
"com.yahoo.messagebus.MessageHandler"
],
"attributes": [
@@ -81,7 +95,8 @@
"public com.yahoo.messagebus.MessageHandler getMessageHandler()",
"public java.lang.String getConnectionSpec()",
"public java.lang.String getName()",
- "public void handleMessage(com.yahoo.messagebus.Message)"
+ "public void handleMessage(com.yahoo.messagebus.Message)",
+ "public void connect()"
],
"fields": []
},
@@ -213,7 +228,8 @@
"superClass": "java.lang.Object",
"interfaces": [
"com.yahoo.messagebus.MessageHandler",
- "com.yahoo.messagebus.ReplyHandler"
+ "com.yahoo.messagebus.ReplyHandler",
+ "com.yahoo.messagebus.Connectable"
],
"attributes": [
"public",
@@ -228,7 +244,8 @@
"public java.lang.String getConnectionSpec()",
"public java.lang.String getName()",
"public void handleMessage(com.yahoo.messagebus.Message)",
- "public void handleReply(com.yahoo.messagebus.Reply)"
+ "public void handleReply(com.yahoo.messagebus.Reply)",
+ "public void connect()"
],
"fields": []
},
@@ -319,8 +336,11 @@
"public com.yahoo.messagebus.SourceSession createSourceSession(com.yahoo.messagebus.SourceSessionParams)",
"public com.yahoo.messagebus.IntermediateSession createIntermediateSession(java.lang.String, boolean, com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.ReplyHandler)",
"public synchronized com.yahoo.messagebus.IntermediateSession createIntermediateSession(com.yahoo.messagebus.IntermediateSessionParams)",
+ "public synchronized com.yahoo.messagebus.IntermediateSession createDetachedIntermediateSession(com.yahoo.messagebus.IntermediateSessionParams)",
"public com.yahoo.messagebus.DestinationSession createDestinationSession(java.lang.String, boolean, com.yahoo.messagebus.MessageHandler)",
"public synchronized com.yahoo.messagebus.DestinationSession createDestinationSession(com.yahoo.messagebus.DestinationSessionParams)",
+ "public synchronized com.yahoo.messagebus.DestinationSession createDetachedDestinationSession(com.yahoo.messagebus.DestinationSessionParams)",
+ "public void connect(java.lang.String, boolean)",
"public synchronized void unregisterSession(java.lang.String, boolean)",
"public void handleMessage(com.yahoo.messagebus.Message)",
"public void handleReply(com.yahoo.messagebus.Reply)",
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java b/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java
new file mode 100644
index 00000000000..40629859d5c
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java
@@ -0,0 +1,13 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+/**
+ * Something which can be connected to a network when ready to receive incoming requests.
+ *
+ * @author jonmv
+ */
+public interface Connectable {
+
+ void connect();
+
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
index a4b5422d502..484305decdc 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
@@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* @author Simon Thoresen Hult
*/
-public final class DestinationSession implements MessageHandler {
+public final class DestinationSession implements Connectable, MessageHandler {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final String name;
@@ -125,4 +125,9 @@ public final class DestinationSession implements MessageHandler {
msgHandler.handleMessage(msg);
}
+ @Override
+ public void connect() {
+ mbus.connect(name, broadcastName);
+ }
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
index ca772ce6c3a..44215692ca4 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
@@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* @author Simon Thoresen Hult
*/
-public final class IntermediateSession implements MessageHandler, ReplyHandler {
+public final class IntermediateSession implements MessageHandler, ReplyHandler, Connectable {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final String name;
@@ -114,4 +114,9 @@ public final class IntermediateSession implements MessageHandler, ReplyHandler {
}
}
+ @Override
+ public void connect() {
+ mbus.connect(name, broadcastName);
+ }
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 4b674b86aeb..313a5dccdaf 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -271,6 +271,12 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* @return The created session.
*/
public synchronized IntermediateSession createIntermediateSession(IntermediateSessionParams params) {
+ IntermediateSession session = createDetachedIntermediateSession(params);
+ connect(params.getName(), params.getBroadcastName());
+ return session;
+ }
+
+ public synchronized IntermediateSession createDetachedIntermediateSession(IntermediateSessionParams params) {
if (destroyed.get()) {
throw new IllegalStateException("Object is destroyed.");
}
@@ -279,7 +285,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
IntermediateSession session = new IntermediateSession(this, params);
sessions.put(params.getName(), session);
- net.registerSession(params.getName(), this, params.getBroadcastName());
return session;
}
@@ -312,6 +317,12 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* @return The created session.
*/
public synchronized DestinationSession createDestinationSession(DestinationSessionParams params) {
+ DestinationSession session = createDetachedDestinationSession(params);
+ connect(params.getName(), params.getBroadcastName());
+ return session;
+ }
+
+ public synchronized DestinationSession createDetachedDestinationSession(DestinationSessionParams params) {
if (destroyed.get()) {
throw new IllegalStateException("Object is destroyed.");
}
@@ -320,10 +331,14 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
DestinationSession session = new DestinationSession(this, params);
sessions.put(params.getName(), session);
- net.registerSession(params.getName(), this, params.getBroadcastName());
return session;
}
+ /** Connects the given session to the network, so it will receive requests. */
+ public void connect(String session, boolean broadcast) {
+ net.registerSession(session, this, broadcast);
+ }
+
/**
* <p>This method is invoked by the {@link
* com.yahoo.messagebus.IntermediateSession#destroy()} to unregister
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
index 5e6a8c7688e..e75b5d0934a 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
@@ -5,9 +5,11 @@ import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Protocol;
import com.yahoo.text.Utf8Array;
+import java.util.Deque;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Logger;
@@ -29,8 +31,8 @@ public class NetworkMultiplexer implements NetworkOwner {
private static final Logger log = Logger.getLogger(NetworkMultiplexer.class.getName());
private final Network net;
- private final Queue<NetworkOwner> owners = new ConcurrentLinkedQueue<>();
- private final Map<String, Queue<NetworkOwner>> sessions = new ConcurrentHashMap<>();
+ private final Deque<NetworkOwner> owners = new ConcurrentLinkedDeque<>();
+ private final Map<String, Deque<NetworkOwner>> sessions = new ConcurrentHashMap<>();
private final boolean shared;
private NetworkMultiplexer(Network net, boolean shared) {
@@ -52,23 +54,21 @@ public class NetworkMultiplexer implements NetworkOwner {
public void registerSession(String session, NetworkOwner owner, boolean broadcast) {
sessions.compute(session, (name, owners) -> {
if (owners == null) {
- owners = new ConcurrentLinkedQueue<>();
+ owners = new ConcurrentLinkedDeque<>();
if (broadcast)
net.registerSession(session);
}
else if (owners.contains(owner))
throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this");
- owners.add(owner);
+ owners.push(owner);
return owners;
});
}
public void unregisterSession(String session, NetworkOwner owner, boolean broadcast) {
- sessions.compute(session, (name, owners) -> {
- if (owners == null || ! owners.remove(owner))
- throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'");
-
+ sessions.computeIfPresent(session, (name, owners) -> {
+ owners.remove(owner);
if (owners.isEmpty()) {
if (broadcast)
net.unregisterSession(session);
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
index f1d83550601..3bf754f29c7 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
@@ -51,7 +51,6 @@ public class NetworkMultiplexerTest {
assertEquals(Set.of("s1"), net.registered);
shared.attach(owner2);
- shared.registerSession("s1", owner2, true);
shared.registerSession("s2", owner2, true);
shared.registerSession("s3", owner2, false);
assertEquals(Set.of("s1", "s2"), net.registered);
@@ -68,13 +67,18 @@ public class NetworkMultiplexerTest {
Message message2 = new SimpleMessage("two");
Message message3 = new SimpleMessage("three");
Message message4 = new SimpleMessage("four");
+ Message message5 = new SimpleMessage("five");
shared.deliverMessage(message1, "s1");
shared.deliverMessage(message2, "s2");
- shared.unregisterSession("s1", owner1, true);
+
+ // New "s1" owner connects, and should have new requests.
+ shared.registerSession("s1", owner2, true);
shared.deliverMessage(message3, "s1");
shared.deliverMessage(message4, "s3");
+ shared.unregisterSession("s1", owner1, true);
+ shared.deliverMessage(message5, "s1");
assertEquals(Map.of("s1", List.of(message1)), owner1.messages);
- assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3), "s3", List.of(message4)), owner2.messages);
+ assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3, message5), "s3", List.of(message4)), owner2.messages);
shared.detach(owner1);
assertEquals(protocol2, shared.getProtocol(name));