// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; import com.yahoo.concurrent.SystemTimer; import java.util.logging.Level; import com.yahoo.messagebus.network.Network; import com.yahoo.messagebus.network.NetworkMultiplexer; import com.yahoo.messagebus.network.NetworkOwner; import com.yahoo.messagebus.routing.Resender; import com.yahoo.messagebus.routing.RetryPolicy; import com.yahoo.messagebus.routing.RoutingPolicy; import com.yahoo.messagebus.routing.RoutingSpec; import com.yahoo.messagebus.routing.RoutingTable; import com.yahoo.messagebus.routing.RoutingTableSpec; import com.yahoo.text.Utf8Array; import com.yahoo.text.Utf8String; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; /** *

A message bus contains the factory for creating sessions to send, receive * and forward messages.

* *

There are three types of sessions:

* * *

A message bus is configured with a {@link Protocol protocol}. This table * enumerates the permissible routes from intermediates to destinations and the * messaging semantics of each hop.

* * The responsibilities of a message bus are: * * * A runtime will typically * * * @author bratseth * @author Simon Thoresen Hult */ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, ReplyHandler { private final static Logger log = Logger.getLogger(MessageBus.class.getName()); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final ProtocolRepository protocolRepository = new ProtocolRepository(); private final AtomicReference> tablesRef = new AtomicReference<>(null); private final Map sessions = new ConcurrentHashMap<>(); private final NetworkMultiplexer net; private final Messenger msn; private final Resender resender; private int maxPendingCount; private int maxPendingSize; private int pendingCount = 0; private int pendingSize = 0; private final Thread careTaker = new Thread(this::sendBlockedMessages); private final Map blockedSenders = new ConcurrentHashMap<>(); public interface SendBlockedMessages { /** * Do what you want, but dont block. * You will be called regularly until you signal you are done * @return true unless you are done */ boolean trySend(); } public void register(SendBlockedMessages sender) { blockedSenders.put(sender, SystemTimer.INSTANCE.milliTime()); } private void sendBlockedMessages() { int timeout = 10*1000/SystemTimer.detectHz(); while (! destroyed.get()) { for (SendBlockedMessages sender : blockedSenders.keySet()) { if (!sender.trySend()) { blockedSenders.remove(sender); } } try { Thread.sleep(timeout); } catch (InterruptedException e) { return; } } } /** *

Convenience constructor that proxies {@link #MessageBus(Network, * MessageBusParams)} by adding the given protocols to a default {@link * MessageBusParams} object.

* * @param net The network to associate with. * @param protocols An array of protocols to register. */ public MessageBus(Network net, List protocols) { this(net, new MessageBusParams().addProtocols(protocols)); } /** *

Constructs an instance of message bus. This requires a network object * that it will associate with. This assignment may not change during the lifetime * of this message bus, and this bus will be the single owner of this net.

* * @param net The network to associate with. * @param params The parameters that controls this bus. */ public MessageBus(Network net, MessageBusParams params) { this(NetworkMultiplexer.dedicated(net), params); } /** *

Constructs an instance of message bus. This requires a network multiplexer * that it will associate with. This assignment may not change during the * lifetime of this message bus.

* * @param net The network multiplexer to associate with. * @param params The parameters that controls this bus. */ public MessageBus(NetworkMultiplexer net, MessageBusParams params) { // Add all known protocols to the repository. maxPendingCount = params.getMaxPendingCount(); maxPendingSize = params.getMaxPendingSize(); for (int i = 0, len = params.getNumProtocols(); i < len; ++i) { protocolRepository.putProtocol(params.getProtocol(i)); } // Attach and start network. this.net = net; net.attach(this); if ( ! net.net().waitUntilReady(120)) throw new IllegalStateException("Network failed to become ready in time."); // Start messenger. msn = new Messenger(); RetryPolicy retryPolicy = params.getRetryPolicy(); if (retryPolicy != null) { resender = new Resender(retryPolicy); msn.addRecurrentTask(new ResenderTask(resender)); } else { resender = null; } careTaker.setDaemon(true); careTaker.start(); msn.start(); } /** *

Sets the destroyed flag to true. The very first time this method is * called, it cleans up all its dependencies. Even if you retain a reference * to this object, all of its content is allowed to be garbage * collected.

* * @return True if content existed and was destroyed. */ public boolean destroy() { if (!destroyed.getAndSet(true)) { try { careTaker.join(); } catch (InterruptedException e) { } protocolRepository.clearPolicyCache(); net.detach(this); msn.destroy(); if (resender != null) { resender.destroy(); } return true; } return false; } /** *

Synchronize with internal threads. This method will handshake with all * internal threads. This has the implicit effect of waiting for all active * callbacks. Note that this method should never be invoked from a callback * since that would make the thread wait for itself... forever. This method * is typically used to untangle during session shutdown.

*/ public void sync() { msn.sync(); net.net().sync(); } /** *

This is a convenience method to call {@link * #createSourceSession(SourceSessionParams)} with default values for the * {@link SourceSessionParams} object.

* * @param handler The reply handler to receive the replies for the session. * @return The created session. */ public SourceSession createSourceSession(ReplyHandler handler) { return createSourceSession(new SourceSessionParams().setReplyHandler(handler)); } /** *

This is a convenience method to call {@link * #createSourceSession(SourceSessionParams)} by first assigning the reply * handler to the parameter object.

* * @param handler The reply handler to receive the replies for the session. * @param params The parameters to control the session. * @return The created session. */ public SourceSession createSourceSession(ReplyHandler handler, SourceSessionParams params) { return createSourceSession(new SourceSessionParams(params).setReplyHandler(handler)); } /** *

Creates a source session on top of this message bus.

* * @param params The parameters to control the session. * @return The created session. */ public SourceSession createSourceSession(SourceSessionParams params) { if (destroyed.get()) { throw new IllegalStateException("Object is destroyed."); } return new SourceSession(this, params); } /** *

This is a convenience method to call {@link * #createIntermediateSession(IntermediateSessionParams)} with default * values for the {@link IntermediateSessionParams} object.

* * @param name The local unique name for the created session. * @param broadcastName Whether or not to broadcast this session's name on * the network. * @param msgHandler The handler to receive the messages for the session. * @param replyHandler The handler to received the replies for the session. * @return The created session. */ public IntermediateSession createIntermediateSession(String name, boolean broadcastName, MessageHandler msgHandler, ReplyHandler replyHandler) { return createIntermediateSession( new IntermediateSessionParams() .setName(name) .setBroadcastName(broadcastName) .setMessageHandler(msgHandler) .setReplyHandler(replyHandler)); } /** *

Creates an intermediate session on top of this message bus using the * given handlers and parameter object.

* * @param params The parameters to control the session. * @return The created session. */ public synchronized IntermediateSession createIntermediateSession(IntermediateSessionParams params) { IntermediateSession session = createDetachedIntermediateSession(params); connect(params.getName(), params.getBroadcastName()); return session; } public synchronized IntermediateSession createDetachedIntermediateSession(IntermediateSessionParams params) { if (destroyed.get()) { throw new IllegalStateException("Object is destroyed."); } if (sessions.containsKey(params.getName())) { throw new IllegalArgumentException("Name '" + params.getName() + "' is not unique."); } IntermediateSession session = new IntermediateSession(this, params); sessions.put(params.getName(), session); return session; } /** *

This is a convenience method to call {@link * #createDestinationSession(DestinationSessionParams)} with default values * for the {@link DestinationSessionParams} object.

* * @param name The local unique name for the created session. * @param broadcastName Whether or not to broadcast this session's name on * the network. * @param handler The handler to receive the messages for the session. * @return The created session. */ public DestinationSession createDestinationSession(String name, boolean broadcastName, MessageHandler handler) { return createDestinationSession( new DestinationSessionParams() .setName(name) .setBroadcastName(broadcastName) .setMessageHandler(handler)); } /** *

Creates a destination session on top of this message bus using the * given handlers and parameter object.

* * @param params The parameters to control the session. * @return The created session. */ public synchronized DestinationSession createDestinationSession(DestinationSessionParams params) { DestinationSession session = createDetachedDestinationSession(params); connect(params.getName(), params.getBroadcastName()); return session; } public synchronized DestinationSession createDetachedDestinationSession(DestinationSessionParams params) { if (destroyed.get()) { throw new IllegalStateException("Object is destroyed."); } if (sessions.containsKey(params.getName())) { throw new IllegalArgumentException("Name '" + params.getName() + "' is not unique."); } DestinationSession session = new DestinationSession(this, params); sessions.put(params.getName(), session); return session; } /** Connects the given session to the network, so it will receive requests. */ public void connect(String session, boolean broadcast) { net.registerSession(session, this, broadcast); } /** *

This method is invoked by the {@link * com.yahoo.messagebus.IntermediateSession#destroy()} to unregister * sessions from receiving data from message bus.

* * @param name The name of the session to remove. * @param broadcastName Whether or not session name was broadcast. */ public synchronized void unregisterSession(String name, boolean broadcastName) { net.unregisterSession(name, this, broadcastName); sessions.remove(name); } private boolean doAccounting() { return (maxPendingCount > 0 || maxPendingSize > 0); } /** *

This method handles choking input data so that message bus does not * blindly accept everything. This prevents an application running * out-of-memory in case it fail to choke input data itself. If this method * returns false, it means that it should be rejected.

* * @param msg The message to count. * @return True if the message was accepted. */ private boolean checkPending(Message msg) { boolean busy = false; int size = msg.getApproxSize(); if (doAccounting()) { synchronized (this) { busy = ((maxPendingCount > 0 && pendingCount >= maxPendingCount) || (maxPendingSize > 0 && pendingSize >= maxPendingSize)); if (!busy) { pendingCount++; pendingSize += size; } } } if (busy) { return false; } msg.setContext(size); msg.pushHandler(this); return true; } @Override public void handleMessage(Message msg) { if (resender != null && msg.hasBucketSequence()) { deliverError(msg, ErrorCode.SEQUENCE_ERROR, "Bucket sequences not supported when resender is enabled."); return; } SendProxy proxy = new SendProxy(this, net.net(), resender); msn.deliverMessage(msg, proxy); } @Override public void handleReply(Reply reply) { if (destroyed.get()) { reply.discard(); return; } if (doAccounting()) { synchronized (this) { --pendingCount; pendingSize -= (Integer)reply.getContext(); } } deliverReply(reply, reply.popHandler()); } @Override public void deliverMessage(Message msg, String session) { MessageHandler msgHandler = sessions.get(session); if (msgHandler == null) { deliverError(msg, ErrorCode.UNKNOWN_SESSION, "Session '" + session + "' does not exist."); } else if (!checkPending(msg)) { deliverError(msg, ErrorCode.SESSION_BUSY, "Session '" + net.net().getConnectionSpec() + "/" + session + "' is busy, try again later."); } else { msn.deliverMessage(msg, msgHandler); } } /** *

Adds a protocol to the internal repository of protocols, replacing any * previous instance of the protocol and clearing the associated routing * policy cache.

* * @param protocol The protocol to add. */ public void putProtocol(Protocol protocol) { protocolRepository.putProtocol(protocol); } @Override public Protocol getProtocol(Utf8Array name) { return protocolRepository.getProtocol(name.toString()); } public void deliverReply(Reply reply, ReplyHandler handler) { msn.deliverReply(reply, handler); } @Override public void setupRouting(RoutingSpec spec) { Map tables = new HashMap<>(); for (int i = 0, len = spec.getNumTables(); i < len; ++i) { RoutingTableSpec table = spec.getTable(i); String name = table.getProtocol(); if (!protocolRepository.hasProtocol(name)) { log.log(Level.INFO, "Protocol '" + name + "' is not supported, ignoring routing table."); continue; } tables.put(name, new RoutingTable(table)); } tablesRef.set(tables); protocolRepository.clearPolicyCache(); } /** *

Returns the resender that is running within this message bus.

* * @return The resender. */ public Resender getResender() { return resender; } /** *

Returns the number of messages received that have not been replied to * yet.

* * @return The pending count. */ public synchronized int getPendingCount() { return pendingCount; } /** *

Returns the size of messages received that have not been replied to * yet.

* * @return The pending size. */ public synchronized int getPendingSize() { return pendingSize; } /** *

Sets the maximum number of messages that can be received without being * replied to yet.

* * @param maxCount The max count. */ public void setMaxPendingCount(int maxCount) { maxPendingCount = maxCount; } /** * Gets maximum number of messages that can be received without being * replied to yet. */ public int getMaxPendingCount() { return maxPendingCount; } /** *

Sets the maximum size of messages that can be received without being * replied to yet.

* * @param maxSize The max size. */ public void setMaxPendingSize(int maxSize) { maxPendingSize = maxSize; } /** * Gets maximum combined size of messages that can be received without * being replied to yet. */ public int getMaxPendingSize() { return maxPendingSize; } /** *

Returns a named routing table, may return null.

* * @param name The name of the routing table to return. * @return The routing table object. */ public RoutingTable getRoutingTable(String name) { Map tables = tablesRef.get(); if (tables == null) { return null; } return tables.get(name); } /** *

Returns a named routing table, may return null.

* * @param name The name of the routing table to return. * @return The routing table object. */ public RoutingTable getRoutingTable(Utf8String name) { return getRoutingTable(name.toString()); } /** *

Returns a routing policy that corresponds to the argument protocol * name, policy name and policy parameter. This will cache reuse all * policies as soon as they are first requested.

* * @param protocolName The name of the protocol to invoke {@link Protocol#createPolicy(String,String)} on. * @param policyName The name of the routing policy to retrieve. * @param policyParam The parameter for the routing policy to retrieve. * @return A corresponding routing policy, or null. */ public RoutingPolicy getRoutingPolicy(String protocolName, String policyName, String policyParam) { return protocolRepository.getRoutingPolicy(protocolName, policyName, policyParam); } /** *

Returns a routing policy that corresponds to the argument protocol * name, policy name and policy parameter. This will cache reuse all * policies as soon as they are first requested.

* * @param protocolName The name of the protocol to invoke {@link Protocol#createPolicy(String,String)} on. * @param policyName The name of the routing policy to retrieve. * @param policyParam The parameter for the routing policy to retrieve. * @return A corresponding routing policy, or null. */ public RoutingPolicy getRoutingPolicy(Utf8String protocolName, String policyName, String policyParam) { return protocolRepository.getRoutingPolicy(protocolName.toString(), policyName, policyParam); } /** *

Returns the connection spec string for the network layer of this * message bus. This is merely a proxy of the same function in the network * layer.

* * @return The connection string. */ public String getConnectionSpec() { return net.net().getConnectionSpec(); } /** *

Constructs and schedules a Reply containing an error to the handler of the given Message.

* * @param msg The message to reply to. * @param errCode The code of the error to set. * @param errMsg The message of the error to set. */ private void deliverError(Message msg, int errCode, String errMsg) { Reply reply = new EmptyReply(); reply.swapState(msg); reply.addError(new Error(errCode, errMsg)); deliverReply(reply, reply.popHandler()); } /** *

Implements a task for running the resender in the messenger * thread. This task acts as a proxy for the resender, allowing the task to * be deleted without affecting the resender itself.

*/ private static class ResenderTask implements Messenger.Task { final Resender resender; ResenderTask(Resender resender) { this.resender = resender; } public void destroy() { // empty } public void run() { resender.resendScheduled(); } } }