aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java2
-rw-r--r--messagebus/abi-spec.json1
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java38
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java143
4 files changed, 170 insertions, 14 deletions
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
index 89bc5b9cecd..709f78f5cf0 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
@@ -17,7 +17,7 @@ import static org.junit.Assert.assertTrue;
/**
* Check the completeness of the mbus session key classes.
*
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author Steinar Knutsen
*/
public class MbusSessionKeyTestCase {
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json
index e6d2b4537fb..85c1eaa4f09 100644
--- a/messagebus/abi-spec.json
+++ b/messagebus/abi-spec.json
@@ -311,6 +311,7 @@
"public void register(com.yahoo.messagebus.MessageBus$SendBlockedMessages)",
"public void <init>(com.yahoo.messagebus.network.Network, java.util.List)",
"public void <init>(com.yahoo.messagebus.network.Network, com.yahoo.messagebus.MessageBusParams)",
+ "public void <init>(com.yahoo.messagebus.network.NetworkMultiplexer, com.yahoo.messagebus.MessageBusParams)",
"public boolean destroy()",
"public void sync()",
"public com.yahoo.messagebus.SourceSession createSourceSession(com.yahoo.messagebus.ReplyHandler)",
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index d1077d7292d..818f46bc470 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -1,10 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
-import com.yahoo.concurrent.CopyOnWriteHashMap;
import com.yahoo.concurrent.SystemTimer;
import java.util.logging.Level;
import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.network.NetworkOwner;
import com.yahoo.messagebus.routing.Resender;
import com.yahoo.messagebus.routing.RetryPolicy;
@@ -64,7 +64,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
private final ProtocolRepository protocolRepository = new ProtocolRepository();
private final AtomicReference<Map<String, RoutingTable>> tablesRef = new AtomicReference<>(null);
private final Map<String, MessageHandler> sessions = new ConcurrentHashMap<>();
- private final Network net;
+ private final NetworkMultiplexer net;
private final Messenger msn;
private final Resender resender;
private int maxPendingCount;
@@ -117,13 +117,25 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
/**
* <p>Constructs an instance of message bus. This requires a network object
- * that it will associate with. This assignment may not change during the
- * lifetime of this message bus.</p>
+ * that it will associate with. This assignment may not change during the lifetime
+ * of this message bus, and this bus will be the single owner of this net.</p>
*
* @param net The network to associate with.
* @param params The parameters that controls this bus.
*/
public MessageBus(Network net, MessageBusParams params) {
+ this(NetworkMultiplexer.dedicated(net), params);
+ }
+
+ /**
+ * <p>Constructs an instance of message bus. This requires a network multiplexer
+ * that it will associate with. This assignment may not change during the
+ * lifetime of this message bus.</p>
+ *
+ * @param net The network multiplexer to associate with.
+ * @param params The parameters that controls this bus.
+ */
+ public MessageBus(NetworkMultiplexer net, MessageBusParams params) {
// Add all known protocols to the repository.
maxPendingCount = params.getMaxPendingCount();
maxPendingSize = params.getMaxPendingSize();
@@ -134,7 +146,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
// Attach and start network.
this.net = net;
net.attach(this);
- if ( ! net.waitUntilReady(120))
+ if ( ! net.net().waitUntilReady(120))
throw new IllegalStateException("Network failed to become ready in time.");
// Start messenger.
@@ -167,7 +179,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
careTaker.join();
} catch (InterruptedException e) { }
protocolRepository.clearPolicyCache();
- net.shutdown();
+ net.detach(this);
msn.destroy();
if (resender != null) {
resender.destroy();
@@ -186,7 +198,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
*/
public void sync() {
msn.sync();
- net.sync();
+ net.net().sync();
}
/**
@@ -268,7 +280,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
IntermediateSession session = new IntermediateSession(this, params);
sessions.put(params.getName(), session);
if (params.getBroadcastName()) {
- net.registerSession(params.getName());
+ net.registerSession(params.getName(), this);
}
return session;
}
@@ -311,7 +323,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
DestinationSession session = new DestinationSession(this, params);
sessions.put(params.getName(), session);
if (params.getBroadcastName()) {
- net.registerSession(params.getName());
+ net.registerSession(params.getName(), this);
}
return session;
}
@@ -326,7 +338,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
*/
public synchronized void unregisterSession(String name, boolean broadcastName) {
if (broadcastName) {
- net.unregisterSession(name);
+ net.unregisterSession(name, this);
}
sessions.remove(name);
}
@@ -371,7 +383,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
deliverError(msg, ErrorCode.SEQUENCE_ERROR, "Bucket sequences not supported when resender is enabled.");
return;
}
- SendProxy proxy = new SendProxy(this, net, resender);
+ SendProxy proxy = new SendProxy(this, net.net(), resender);
msn.deliverMessage(msg, proxy);
}
@@ -396,7 +408,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
if (msgHandler == null) {
deliverError(msg, ErrorCode.UNKNOWN_SESSION, "Session '" + session + "' does not exist.");
} else if (!checkPending(msg)) {
- deliverError(msg, ErrorCode.SESSION_BUSY, "Session '" + net.getConnectionSpec() + "/" + session +
+ deliverError(msg, ErrorCode.SESSION_BUSY, "Session '" + net.net().getConnectionSpec() + "/" + session +
"' is busy, try again later.");
} else {
msn.deliverMessage(msg, msgHandler);
@@ -564,7 +576,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* @return The connection string.
*/
public String getConnectionSpec() {
- return net.getConnectionSpec();
+ return net.net().getConnectionSpec();
}
/**
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
new file mode 100644
index 00000000000..0a2a69368ac
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
@@ -0,0 +1,143 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.text.Utf8Array;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Logger;
+
+/**
+ * A bridge between the reusable, singleton RPC network, and the generational message bus which uses this.
+ * The RPC network is required to be singular because of its unique resources, such as sockets.
+ * This is complicated by the message bus potentially existing in different graph generation at any point in
+ * time, with all copies potential users of the network interface, but where each message bus-registered session
+ * should belong to a single message bus. This class solves these problems by tracking which sessions are
+ * active in which message bus instance, and by (de)registering only when a session is registered to (no) message
+ * bus instances.
+ *
+ * In time, this should allow us to get rid of the shared-this-and-that in the container, too ...
+ *
+ * @author jonmv
+ */
+public class NetworkMultiplexer extends AbstractComponent implements NetworkOwner {
+
+ private static final Logger log = Logger.getLogger(NetworkMultiplexer.class.getName());
+
+ private final Network net;
+ private final Queue<NetworkOwner> owners = new ConcurrentLinkedQueue<>();
+ private final Map<String, Queue<NetworkOwner>> sessions = new ConcurrentHashMap<>();
+ private final boolean shared;
+
+ private NetworkMultiplexer(Network net, boolean shared) {
+ net.attach(this);
+ this.net = net;
+ this.shared = shared;
+ }
+
+ /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s. */
+ public static NetworkMultiplexer shared(Network net) {
+ return new NetworkMultiplexer(net, true);
+ }
+
+ /** Returns a network multiplexer with a single {@link NetworkOwner}, which shuts down when this owner detaches. */
+ public static NetworkMultiplexer dedicated(Network net) {
+ return new NetworkMultiplexer(net, false);
+ }
+
+ public void registerSession(String session, NetworkOwner owner) {
+ sessions.compute(session, (name, owners) -> {
+ if (owners == null) {
+ owners = new ConcurrentLinkedQueue<>();
+ net.registerSession(session);
+ }
+ else if (owners.contains(owner))
+ throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this");
+
+ owners.add(owner);
+ return owners;
+ });
+ }
+
+ public void unregisterSession(String session, NetworkOwner owner) {
+ sessions.compute(session, (name, owners) -> {
+ if (owners == null || ! owners.remove(owner))
+ throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'");
+
+ if (owners.isEmpty()) {
+ net.unregisterSession(session);
+ return null;
+ }
+ return owners;
+ });
+ }
+
+ @Override
+ public Protocol getProtocol(Utf8Array name) {
+ for (NetworkOwner owner : owners) {
+ Protocol protocol = owner.getProtocol(name);
+ if (protocol != null)
+ return protocol;
+ }
+ return null;
+ }
+
+ @Override
+ public void deliverMessage(Message message, String session) {
+ // Send to first owner which has registered this session, or fall back to first attached owner (for rejection).
+ NetworkOwner owner = sessions.getOrDefault(session, owners).peek();
+ if (owner == null) { // Should not happen.
+ log.warning(this + " received message '" + message + "' with no owners attached");
+ message.discard();
+ }
+ else
+ owner.deliverMessage(message, session);
+ }
+
+ public void attach(NetworkOwner owner) {
+ if (owners.contains(owner))
+ throw new IllegalArgumentException(owner + " is already attached to this");
+
+ owners.add(owner);
+ }
+
+ public void detach(NetworkOwner owner) {
+ if ( ! owners.remove(owner))
+ throw new IllegalArgumentException(owner + " not attached to this");
+
+ if ( ! shared && owners.isEmpty())
+ net.shutdown();
+ }
+
+ @Override
+ public void deconstruct() {
+ if ( ! shared)
+ throw new UnsupportedOperationException("Deconstruct called on a dedicated multiplexer; " +
+ "this shuts down when detached from");
+
+ if ( ! owners.isEmpty())
+ log.warning("NetworkMultiplexer shut down before all owners detached: " + this);
+
+ net.shutdown();
+ }
+
+ public Network net() {
+ return net;
+ }
+
+ @Override
+ public String toString() {
+ return "NetworkMultiplexer{" +
+ "net=" + net +
+ ", owners=" + owners +
+ ", sessions=" + sessions +
+ ", shared=" + shared +
+ '}';
+ }
+
+} \ No newline at end of file