aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-08-23 10:14:03 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-23 10:14:03 +0200
commit26d36def2b74166bda5000660ad8a9467f57e728 (patch)
tree59ba2afaaecdf612c1aed82aef153eb3b9f6ead6 /messagebus
parent43402b23dc74fe6550d231b26e8c50327e1971f9 (diff)
Let server session.connect() conncet to net, and use newest destination
No practical changes, but less API misuse. @bjorncs please review.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/abi-spec.json26
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/Connectable.java13
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java7
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java7
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java19
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java16
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java10
7 files changed, 80 insertions, 18 deletions
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json
index 85c1eaa4f09..623904bef8d 100644
--- a/messagebus/abi-spec.json
+++ b/messagebus/abi-spec.json
@@ -64,9 +64,23 @@
],
"fields": []
},
+ "com.yahoo.messagebus.Connectable": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract void connect()"
+ ],
+ "fields": []
+ },
"com.yahoo.messagebus.DestinationSession": {
"superClass": "java.lang.Object",
"interfaces": [
+ "com.yahoo.messagebus.Connectable",
"com.yahoo.messagebus.MessageHandler"
],
"attributes": [
@@ -81,7 +95,8 @@
"public com.yahoo.messagebus.MessageHandler getMessageHandler()",
"public java.lang.String getConnectionSpec()",
"public java.lang.String getName()",
- "public void handleMessage(com.yahoo.messagebus.Message)"
+ "public void handleMessage(com.yahoo.messagebus.Message)",
+ "public void connect()"
],
"fields": []
},
@@ -213,7 +228,8 @@
"superClass": "java.lang.Object",
"interfaces": [
"com.yahoo.messagebus.MessageHandler",
- "com.yahoo.messagebus.ReplyHandler"
+ "com.yahoo.messagebus.ReplyHandler",
+ "com.yahoo.messagebus.Connectable"
],
"attributes": [
"public",
@@ -228,7 +244,8 @@
"public java.lang.String getConnectionSpec()",
"public java.lang.String getName()",
"public void handleMessage(com.yahoo.messagebus.Message)",
- "public void handleReply(com.yahoo.messagebus.Reply)"
+ "public void handleReply(com.yahoo.messagebus.Reply)",
+ "public void connect()"
],
"fields": []
},
@@ -319,8 +336,11 @@
"public com.yahoo.messagebus.SourceSession createSourceSession(com.yahoo.messagebus.SourceSessionParams)",
"public com.yahoo.messagebus.IntermediateSession createIntermediateSession(java.lang.String, boolean, com.yahoo.messagebus.MessageHandler, com.yahoo.messagebus.ReplyHandler)",
"public synchronized com.yahoo.messagebus.IntermediateSession createIntermediateSession(com.yahoo.messagebus.IntermediateSessionParams)",
+ "public synchronized com.yahoo.messagebus.IntermediateSession createDetachedIntermediateSession(com.yahoo.messagebus.IntermediateSessionParams)",
"public com.yahoo.messagebus.DestinationSession createDestinationSession(java.lang.String, boolean, com.yahoo.messagebus.MessageHandler)",
"public synchronized com.yahoo.messagebus.DestinationSession createDestinationSession(com.yahoo.messagebus.DestinationSessionParams)",
+ "public synchronized com.yahoo.messagebus.DestinationSession createDetachedDestinationSession(com.yahoo.messagebus.DestinationSessionParams)",
+ "public void connect(java.lang.String, boolean)",
"public synchronized void unregisterSession(java.lang.String, boolean)",
"public void handleMessage(com.yahoo.messagebus.Message)",
"public void handleReply(com.yahoo.messagebus.Reply)",
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java b/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java
new file mode 100644
index 00000000000..40629859d5c
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Connectable.java
@@ -0,0 +1,13 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+/**
+ * Something which can be connected to a network when ready to receive incoming requests.
+ *
+ * @author jonmv
+ */
+public interface Connectable {
+
+ void connect();
+
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
index a4b5422d502..484305decdc 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java
@@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* @author Simon Thoresen Hult
*/
-public final class DestinationSession implements MessageHandler {
+public final class DestinationSession implements Connectable, MessageHandler {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final String name;
@@ -125,4 +125,9 @@ public final class DestinationSession implements MessageHandler {
msgHandler.handleMessage(msg);
}
+ @Override
+ public void connect() {
+ mbus.connect(name, broadcastName);
+ }
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
index ca772ce6c3a..44215692ca4 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/IntermediateSession.java
@@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* @author Simon Thoresen Hult
*/
-public final class IntermediateSession implements MessageHandler, ReplyHandler {
+public final class IntermediateSession implements MessageHandler, ReplyHandler, Connectable {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final String name;
@@ -114,4 +114,9 @@ public final class IntermediateSession implements MessageHandler, ReplyHandler {
}
}
+ @Override
+ public void connect() {
+ mbus.connect(name, broadcastName);
+ }
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 4b674b86aeb..313a5dccdaf 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -271,6 +271,12 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* @return The created session.
*/
public synchronized IntermediateSession createIntermediateSession(IntermediateSessionParams params) {
+ IntermediateSession session = createDetachedIntermediateSession(params);
+ connect(params.getName(), params.getBroadcastName());
+ return session;
+ }
+
+ public synchronized IntermediateSession createDetachedIntermediateSession(IntermediateSessionParams params) {
if (destroyed.get()) {
throw new IllegalStateException("Object is destroyed.");
}
@@ -279,7 +285,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
IntermediateSession session = new IntermediateSession(this, params);
sessions.put(params.getName(), session);
- net.registerSession(params.getName(), this, params.getBroadcastName());
return session;
}
@@ -312,6 +317,12 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* @return The created session.
*/
public synchronized DestinationSession createDestinationSession(DestinationSessionParams params) {
+ DestinationSession session = createDetachedDestinationSession(params);
+ connect(params.getName(), params.getBroadcastName());
+ return session;
+ }
+
+ public synchronized DestinationSession createDetachedDestinationSession(DestinationSessionParams params) {
if (destroyed.get()) {
throw new IllegalStateException("Object is destroyed.");
}
@@ -320,10 +331,14 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
DestinationSession session = new DestinationSession(this, params);
sessions.put(params.getName(), session);
- net.registerSession(params.getName(), this, params.getBroadcastName());
return session;
}
+ /** Connects the given session to the network, so it will receive requests. */
+ public void connect(String session, boolean broadcast) {
+ net.registerSession(session, this, broadcast);
+ }
+
/**
* <p>This method is invoked by the {@link
* com.yahoo.messagebus.IntermediateSession#destroy()} to unregister
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
index 5e6a8c7688e..e75b5d0934a 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
@@ -5,9 +5,11 @@ import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Protocol;
import com.yahoo.text.Utf8Array;
+import java.util.Deque;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Logger;
@@ -29,8 +31,8 @@ public class NetworkMultiplexer implements NetworkOwner {
private static final Logger log = Logger.getLogger(NetworkMultiplexer.class.getName());
private final Network net;
- private final Queue<NetworkOwner> owners = new ConcurrentLinkedQueue<>();
- private final Map<String, Queue<NetworkOwner>> sessions = new ConcurrentHashMap<>();
+ private final Deque<NetworkOwner> owners = new ConcurrentLinkedDeque<>();
+ private final Map<String, Deque<NetworkOwner>> sessions = new ConcurrentHashMap<>();
private final boolean shared;
private NetworkMultiplexer(Network net, boolean shared) {
@@ -52,23 +54,21 @@ public class NetworkMultiplexer implements NetworkOwner {
public void registerSession(String session, NetworkOwner owner, boolean broadcast) {
sessions.compute(session, (name, owners) -> {
if (owners == null) {
- owners = new ConcurrentLinkedQueue<>();
+ owners = new ConcurrentLinkedDeque<>();
if (broadcast)
net.registerSession(session);
}
else if (owners.contains(owner))
throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this");
- owners.add(owner);
+ owners.push(owner);
return owners;
});
}
public void unregisterSession(String session, NetworkOwner owner, boolean broadcast) {
- sessions.compute(session, (name, owners) -> {
- if (owners == null || ! owners.remove(owner))
- throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'");
-
+ sessions.computeIfPresent(session, (name, owners) -> {
+ owners.remove(owner);
if (owners.isEmpty()) {
if (broadcast)
net.unregisterSession(session);
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
index f1d83550601..3bf754f29c7 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
@@ -51,7 +51,6 @@ public class NetworkMultiplexerTest {
assertEquals(Set.of("s1"), net.registered);
shared.attach(owner2);
- shared.registerSession("s1", owner2, true);
shared.registerSession("s2", owner2, true);
shared.registerSession("s3", owner2, false);
assertEquals(Set.of("s1", "s2"), net.registered);
@@ -68,13 +67,18 @@ public class NetworkMultiplexerTest {
Message message2 = new SimpleMessage("two");
Message message3 = new SimpleMessage("three");
Message message4 = new SimpleMessage("four");
+ Message message5 = new SimpleMessage("five");
shared.deliverMessage(message1, "s1");
shared.deliverMessage(message2, "s2");
- shared.unregisterSession("s1", owner1, true);
+
+ // New "s1" owner connects, and should have new requests.
+ shared.registerSession("s1", owner2, true);
shared.deliverMessage(message3, "s1");
shared.deliverMessage(message4, "s3");
+ shared.unregisterSession("s1", owner1, true);
+ shared.deliverMessage(message5, "s1");
assertEquals(Map.of("s1", List.of(message1)), owner1.messages);
- assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3), "s3", List.of(message4)), owner2.messages);
+ assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3, message5), "s3", List.of(message4)), owner2.messages);
shared.detach(owner1);
assertEquals(protocol2, shared.getProtocol(name));