From b919b72514821cb98028af232fab2eaec6bf414a Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 16 Aug 2021 13:31:23 +0200 Subject: Insert a NetworkMultiplexer between MessageBus and its Network --- messagebus/abi-spec.json | 1 + .../main/java/com/yahoo/messagebus/MessageBus.java | 38 ++++-- .../messagebus/network/NetworkMultiplexer.java | 143 +++++++++++++++++++++ 3 files changed, 169 insertions(+), 13 deletions(-) create mode 100644 messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java (limited to 'messagebus') diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json index e6d2b4537fb..85c1eaa4f09 100644 --- a/messagebus/abi-spec.json +++ b/messagebus/abi-spec.json @@ -311,6 +311,7 @@ "public void register(com.yahoo.messagebus.MessageBus$SendBlockedMessages)", "public void (com.yahoo.messagebus.network.Network, java.util.List)", "public void (com.yahoo.messagebus.network.Network, com.yahoo.messagebus.MessageBusParams)", + "public void (com.yahoo.messagebus.network.NetworkMultiplexer, com.yahoo.messagebus.MessageBusParams)", "public boolean destroy()", "public void sync()", "public com.yahoo.messagebus.SourceSession createSourceSession(com.yahoo.messagebus.ReplyHandler)", diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index d1077d7292d..818f46bc470 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -1,10 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; -import com.yahoo.concurrent.CopyOnWriteHashMap; import com.yahoo.concurrent.SystemTimer; import java.util.logging.Level; import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.NetworkMultiplexer; import com.yahoo.messagebus.network.NetworkOwner; import com.yahoo.messagebus.routing.Resender; import com.yahoo.messagebus.routing.RetryPolicy; @@ -64,7 +64,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, private final ProtocolRepository protocolRepository = new ProtocolRepository(); private final AtomicReference> tablesRef = new AtomicReference<>(null); private final Map sessions = new ConcurrentHashMap<>(); - private final Network net; + private final NetworkMultiplexer net; private final Messenger msn; private final Resender resender; private int maxPendingCount; @@ -117,13 +117,25 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, /** *

Constructs an instance of message bus. This requires a network object - * that it will associate with. This assignment may not change during the - * lifetime of this message bus.

+ * that it will associate with. This assignment may not change during the lifetime + * of this message bus, and this bus will be the single owner of this net.

* * @param net The network to associate with. * @param params The parameters that controls this bus. */ public MessageBus(Network net, MessageBusParams params) { + this(NetworkMultiplexer.dedicated(net), params); + } + + /** + *

Constructs an instance of message bus. This requires a network multiplexer + * that it will associate with. This assignment may not change during the + * lifetime of this message bus.

+ * + * @param net The network multiplexer to associate with. + * @param params The parameters that controls this bus. + */ + public MessageBus(NetworkMultiplexer net, MessageBusParams params) { // Add all known protocols to the repository. maxPendingCount = params.getMaxPendingCount(); maxPendingSize = params.getMaxPendingSize(); @@ -134,7 +146,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, // Attach and start network. this.net = net; net.attach(this); - if ( ! net.waitUntilReady(120)) + if ( ! net.net().waitUntilReady(120)) throw new IllegalStateException("Network failed to become ready in time."); // Start messenger. @@ -167,7 +179,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, careTaker.join(); } catch (InterruptedException e) { } protocolRepository.clearPolicyCache(); - net.shutdown(); + net.detach(this); msn.destroy(); if (resender != null) { resender.destroy(); @@ -186,7 +198,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, */ public void sync() { msn.sync(); - net.sync(); + net.net().sync(); } /** @@ -268,7 +280,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, IntermediateSession session = new IntermediateSession(this, params); sessions.put(params.getName(), session); if (params.getBroadcastName()) { - net.registerSession(params.getName()); + net.registerSession(params.getName(), this); } return session; } @@ -311,7 +323,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, DestinationSession session = new DestinationSession(this, params); sessions.put(params.getName(), session); if (params.getBroadcastName()) { - net.registerSession(params.getName()); + net.registerSession(params.getName(), this); } return session; } @@ -326,7 +338,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, */ public synchronized void unregisterSession(String name, boolean broadcastName) { if (broadcastName) { - net.unregisterSession(name); + net.unregisterSession(name, this); } sessions.remove(name); } @@ -371,7 +383,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, deliverError(msg, ErrorCode.SEQUENCE_ERROR, "Bucket sequences not supported when resender is enabled."); return; } - SendProxy proxy = new SendProxy(this, net, resender); + SendProxy proxy = new SendProxy(this, net.net(), resender); msn.deliverMessage(msg, proxy); } @@ -396,7 +408,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, if (msgHandler == null) { deliverError(msg, ErrorCode.UNKNOWN_SESSION, "Session '" + session + "' does not exist."); } else if (!checkPending(msg)) { - deliverError(msg, ErrorCode.SESSION_BUSY, "Session '" + net.getConnectionSpec() + "/" + session + + deliverError(msg, ErrorCode.SESSION_BUSY, "Session '" + net.net().getConnectionSpec() + "/" + session + "' is busy, try again later."); } else { msn.deliverMessage(msg, msgHandler); @@ -564,7 +576,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * @return The connection string. */ public String getConnectionSpec() { - return net.getConnectionSpec(); + return net.net().getConnectionSpec(); } /** diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java new file mode 100644 index 00000000000..0a2a69368ac --- /dev/null +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java @@ -0,0 +1,143 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network; + +import com.yahoo.component.AbstractComponent; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Protocol; +import com.yahoo.text.Utf8Array; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.logging.Logger; + +/** + * A bridge between the reusable, singleton RPC network, and the generational message bus which uses this. + * The RPC network is required to be singular because of its unique resources, such as sockets. + * This is complicated by the message bus potentially existing in different graph generation at any point in + * time, with all copies potential users of the network interface, but where each message bus-registered session + * should belong to a single message bus. This class solves these problems by tracking which sessions are + * active in which message bus instance, and by (de)registering only when a session is registered to (no) message + * bus instances. + * + * In time, this should allow us to get rid of the shared-this-and-that in the container, too ... + * + * @author jonmv + */ +public class NetworkMultiplexer extends AbstractComponent implements NetworkOwner { + + private static final Logger log = Logger.getLogger(NetworkMultiplexer.class.getName()); + + private final Network net; + private final Queue owners = new ConcurrentLinkedQueue<>(); + private final Map> sessions = new ConcurrentHashMap<>(); + private final boolean shared; + + private NetworkMultiplexer(Network net, boolean shared) { + net.attach(this); + this.net = net; + this.shared = shared; + } + + /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s. */ + public static NetworkMultiplexer shared(Network net) { + return new NetworkMultiplexer(net, true); + } + + /** Returns a network multiplexer with a single {@link NetworkOwner}, which shuts down when this owner detaches. */ + public static NetworkMultiplexer dedicated(Network net) { + return new NetworkMultiplexer(net, false); + } + + public void registerSession(String session, NetworkOwner owner) { + sessions.compute(session, (name, owners) -> { + if (owners == null) { + owners = new ConcurrentLinkedQueue<>(); + net.registerSession(session); + } + else if (owners.contains(owner)) + throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this"); + + owners.add(owner); + return owners; + }); + } + + public void unregisterSession(String session, NetworkOwner owner) { + sessions.compute(session, (name, owners) -> { + if (owners == null || ! owners.remove(owner)) + throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'"); + + if (owners.isEmpty()) { + net.unregisterSession(session); + return null; + } + return owners; + }); + } + + @Override + public Protocol getProtocol(Utf8Array name) { + for (NetworkOwner owner : owners) { + Protocol protocol = owner.getProtocol(name); + if (protocol != null) + return protocol; + } + return null; + } + + @Override + public void deliverMessage(Message message, String session) { + // Send to first owner which has registered this session, or fall back to first attached owner (for rejection). + NetworkOwner owner = sessions.getOrDefault(session, owners).peek(); + if (owner == null) { // Should not happen. + log.warning(this + " received message '" + message + "' with no owners attached"); + message.discard(); + } + else + owner.deliverMessage(message, session); + } + + public void attach(NetworkOwner owner) { + if (owners.contains(owner)) + throw new IllegalArgumentException(owner + " is already attached to this"); + + owners.add(owner); + } + + public void detach(NetworkOwner owner) { + if ( ! owners.remove(owner)) + throw new IllegalArgumentException(owner + " not attached to this"); + + if ( ! shared && owners.isEmpty()) + net.shutdown(); + } + + @Override + public void deconstruct() { + if ( ! shared) + throw new UnsupportedOperationException("Deconstruct called on a dedicated multiplexer; " + + "this shuts down when detached from"); + + if ( ! owners.isEmpty()) + log.warning("NetworkMultiplexer shut down before all owners detached: " + this); + + net.shutdown(); + } + + public Network net() { + return net; + } + + @Override + public String toString() { + return "NetworkMultiplexer{" + + "net=" + net + + ", owners=" + owners + + ", sessions=" + sessions + + ", shared=" + shared + + '}'; + } + +} \ No newline at end of file -- cgit v1.2.3