diff options
author | Jon Bratseth <jonbratseth@yahoo.com> | 2016-09-29 23:28:23 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-29 23:28:23 +0200 |
commit | 8aeed8886002cc6ad9261da57763766cad5ca7bf (patch) | |
tree | 814bc1c016c679e67306101e7b409ce7c4d2272b /messagebus/src | |
parent | 151d1cf4c09fc24c39d665a0818c64ff7bf80716 (diff) | |
parent | 0d27136b2912ce0dd6ca1c6cc5341ebeba71dc12 (diff) |
Merge pull request #738 from yahoo/bratseth/support-document-api-in-applications
Bratseth/support document api in applications
Diffstat (limited to 'messagebus/src')
6 files changed, 135 insertions, 97 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 729bef7985f..cf5beb4a903 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -22,33 +22,34 @@ import java.util.logging.Logger; * and forward messages.</p> * * <p>There are three types of sessions:</p> - * <ul><li>{@link SourceSession Source sessions} sends messages and receives - * replies</li> - * <li>{@link IntermediateSession Intermediate sessions} receives messages on - * their way to their final destination, and may decide to forward the messages - * or reply directly.</li> - * <li>{@link DestinationSession Destination sessions} are the final recipient - * of messages, and are expected to reply to every one of them, but may not - * forward messages.</li></ul> + * <ul> + * <li>{@link SourceSession Source sessions} sends messages and receives replies</li> + * <li>{@link IntermediateSession Intermediate sessions} receives messages on + * their way to their final destination, and may decide to forward the messages or reply directly. + * <li>{@link DestinationSession Destination sessions} are the final recipient + * of messages, and are expected to reply to every one of them, but may not forward messages. + * </ul> * * <p>A message bus is configured with a {@link Protocol protocol}. This table * enumerates the permissible routes from intermediates to destinations and the * messaging semantics of each hop.</p> * - * <p>The responsibilities of a message bus are:</p> - * <ul> <li>Assign a route to every send message from its routing table</li> - * <li>Deliver every message it <i>accepts</i> to the next hop on its route on a - * best effort basis, <i>or</i> deliver a <i>failure reply</i>.</li> - * <li>Deliver replies back to message sources through all the intermediate - * hops.</li></ul> + * The responsibilities of a message bus are: + * <ul> + * <li>Assign a route to every send message from its routing table + * <li>Deliver every message it <i>accepts</i> to the next hop on its route + * <i>or</i> deliver a <i>failure reply</i>. + * <li>Deliver replies back to message sources through all the intermediate hops. + * </ul> * - * <p>A runtime will typically</p> - * <ul><li>Create a message bus implementation and set properties on this - * implementation once.</li> - * <li>Create sessions using that message bus many places.</li></ul> + * A runtime will typically + * <ul> + * <li>Create a message bus implementation and set properties on this implementation once. + * <li>Create sessions using that message bus many places.</li> + * </ul> * - * @author btratseth - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + * @author bratseth + * @author Simon Thoresen */ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, ReplyHandler { @@ -101,9 +102,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, // Attach and start network. this.net = net; net.attach(this); - if (!net.waitUntilReady(120)) { + if ( ! net.waitUntilReady(120)) throw new IllegalStateException("Network failed to become ready in time."); - } // Start messenger. msn = new Messenger(); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java new file mode 100644 index 00000000000..24e177f1fbf --- /dev/null +++ b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java @@ -0,0 +1,43 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.messagebus.network.Network; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The combination of a messagebus and a network over which it may send data. + * + * @author bratseth + */ +public class NetworkMessageBus { + + private final Network network; + private final MessageBus messageBus; + + private final AtomicBoolean destroyed = new AtomicBoolean(false); + + public NetworkMessageBus(Network network, MessageBus messageBus) { + this.network = network; + this.messageBus = messageBus; + } + + /** Returns the contained message bus object */ + public MessageBus getMessageBus() { return messageBus; } + + /** Returns the network of this as a Network */ + public Network getNetwork() { return network; } + + /** + * Irreversibly destroys the content of this. + * + * @return whether this destroyed anything, or if it was already destroyed + */ + public boolean destroy() { + if ( destroyed.getAndSet(true)) return false; + + getMessageBus().destroy(); + return true; + } + +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java index d767e197b11..cfa50a35549 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java @@ -3,6 +3,7 @@ package com.yahoo.messagebus; import com.yahoo.log.LogLevel; import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.Network; import com.yahoo.messagebus.network.rpc.RPCNetwork; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; @@ -17,12 +18,9 @@ import java.util.logging.Logger; * * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> */ -public class RPCMessageBus { +public class RPCMessageBus extends NetworkMessageBus { private static final Logger log = Logger.getLogger(RPCMessageBus.class.getName()); - private final AtomicBoolean destroyed = new AtomicBoolean(false); - private final MessageBus mbus; - private final RPCNetwork net; private final ConfigAgent configAgent; /** @@ -33,9 +31,16 @@ public class RPCMessageBus { * @param routingCfgId The config id for message bus routing specs. */ public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams, String routingCfgId) { - net = new RPCNetwork(rpcParams); - mbus = new MessageBus(net, mbusParams); - configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", mbus); + this(mbusParams, new RPCNetwork(rpcParams), routingCfgId); + } + + private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network, String routingCfgId) { + this(new MessageBus(network, mbusParams), network, routingCfgId); + } + + private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) { + super(network, messageBus); + configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", messageBus); configAgent.subscribe(); } @@ -80,33 +85,17 @@ public class RPCMessageBus { * Sets the destroyed flag to true. The very first time this method is called, it cleans up all its dependencies. * Even if you retain a reference to this object, all of its content is allowed to be garbage collected. * - * @return True if content existed and was destroyed. + * @return true if content existed and was destroyed. */ + @Override public boolean destroy() { - if (!destroyed.getAndSet(true)) { + boolean destroyed = super.destroy(); + if (destroyed) configAgent.shutdown(); - mbus.destroy(); - return true; - } - return false; + return destroyed; } - /** - * Returns the contained message bus object. - * - * @return Message bus. - */ - public MessageBus getMessageBus() { - return mbus; - } - - /** - * Returns the contained rpc network object. - * - * @return RPC network. - */ - public RPCNetwork getRPCNetwork() { - return net; - } + /** Returns the network of this as a RPCNetwork */ + public RPCNetwork getRPCNetwork() { return (RPCNetwork)getNetwork(); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java index cd3b3286778..b0bbe4266c4 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java @@ -21,28 +21,28 @@ public interface Network { * @param seconds The timeout. * @return True if ready. */ - public boolean waitUntilReady(double seconds); + boolean waitUntilReady(double seconds); /** * Attach the network layer to the given owner * * @param owner owner of the network */ - public void attach(NetworkOwner owner); + void attach(NetworkOwner owner); /** * Register a session name with the network layer. This will make the session visible to other nodes. * * @param session the session name */ - public void registerSession(String session); + void registerSession(String session); /** * Unregister a session name with the network layer. This will make the session unavailable for other nodes. * * @param session session name */ - public void unregisterSession(String session); + void unregisterSession(String session); /** * Resolves the service address of the recipient referenced by the given routing node. If a recipient can not be @@ -52,7 +52,7 @@ public interface Network { * @param recipient The node whose service address to allocate. * @return True if a service address was allocated. */ - public boolean allocServiceAddress(RoutingNode recipient); + boolean allocServiceAddress(RoutingNode recipient); /** * Frees the service address from the given routing node. This allows the network layer to track and close @@ -60,7 +60,7 @@ public interface Network { * * @param recipient The node whose service address to free. */ - public void freeServiceAddress(RoutingNode recipient); + void freeServiceAddress(RoutingNode recipient); /** * Send a message to the given recipients. A {@link RoutingNode} contains all the necessary context for sending. @@ -68,7 +68,7 @@ public interface Network { * @param msg The message to send. * @param recipients A list of routing leaf nodes resolved for the message. */ - public void send(Message msg, List<RoutingNode> recipients); + void send(Message msg, List<RoutingNode> recipients); /** * Synchronize with internal threads. This method will handshake with all internal threads. This has the implicit @@ -76,12 +76,12 @@ public interface Network { * that would make the thread wait for itself... forever. This method is typically used to untangle during session * shutdown. */ - public void sync(); + void sync(); /** * Shuts down the network. This is a blocking call that waits for all scheduled tasks to complete. */ - public void shutdown(); + void shutdown(); /** * Returns a string that represents the connection specs of this network. It is in not a complete address since it @@ -89,12 +89,13 @@ public interface Network { * * @return The connection string. */ - public String getConnectionSpec(); + String getConnectionSpec(); /** * Returns a reference to a name server mirror. * * @return The mirror object. */ - public IMirror getMirror(); + IMirror getMirror(); + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index ffcb853a0a7..78cf352cfbf 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -23,7 +23,7 @@ import java.util.concurrent.Executors; import static com.yahoo.messagebus.ErrorCode.NO_ADDRESS_FOR_SERVICE; /** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + * @author Simon Thoresen Hult */ public class LocalNetwork implements Network { @@ -32,35 +32,39 @@ public class LocalNetwork implements Network { private final String hostId; private volatile NetworkOwner owner; - public LocalNetwork(final LocalWire wire) { + public LocalNetwork() { + this(new LocalWire()); + } + + public LocalNetwork(LocalWire wire) { this.wire = wire; this.hostId = wire.newHostId(); } @Override - public boolean waitUntilReady(final double seconds) { + public boolean waitUntilReady(double seconds) { return true; } @Override - public void attach(final NetworkOwner owner) { + public void attach(NetworkOwner owner) { this.owner = owner; } @Override - public void registerSession(final String session) { + public void registerSession(String session) { wire.registerService(hostId + "/" + session, this); } @Override - public void unregisterSession(final String session) { + public void unregisterSession(String session) { wire.unregisterService(hostId + "/" + session); } @Override - public boolean allocServiceAddress(final RoutingNode recipient) { - final String service = recipient.getRoute().getHop(0).getServiceName(); - final ServiceAddress address = wire.resolveServiceAddress(service); + public boolean allocServiceAddress(RoutingNode recipient) { + String service = recipient.getRoute().getHop(0).getServiceName(); + ServiceAddress address = wire.resolveServiceAddress(service); if (address == null) { recipient.setError(new Error(NO_ADDRESS_FOR_SERVICE, "No address for service '" + service + "'.")); return false; @@ -70,24 +74,24 @@ public class LocalNetwork implements Network { } @Override - public void freeServiceAddress(final RoutingNode recipient) { + public void freeServiceAddress(RoutingNode recipient) { recipient.setServiceAddress(null); } @Override - public void send(final Message msg, final List<RoutingNode> recipients) { - for (final RoutingNode recipient : recipients) { + public void send(Message msg, List<RoutingNode> recipients) { + for (RoutingNode recipient : recipients) { new MessageEnvelope(this, msg, recipient).send(); } } - private void receiveLater(final MessageEnvelope envelope) { - final byte[] payload = envelope.sender.encode(envelope.msg.getProtocol(), envelope.msg); + private void receiveLater(MessageEnvelope envelope) { + byte[] payload = envelope.sender.encode(envelope.msg.getProtocol(), envelope.msg); executor.execute(new Runnable() { @Override public void run() { - final Message msg = decode(envelope.msg.getProtocol(), payload, Message.class); + Message msg = decode(envelope.msg.getProtocol(), payload, Message.class); msg.getTrace().setLevel(envelope.msg.getTrace().getLevel()); msg.setRoute(envelope.msg.getRoute()).getRoute().removeHop(0); msg.setRetryEnabled(envelope.msg.getRetryEnabled()); @@ -96,7 +100,7 @@ public class LocalNetwork implements Network { msg.pushHandler(new ReplyHandler() { @Override - public void handleReply(final Reply reply) { + public void handleReply(Reply reply) { new ReplyEnvelope(LocalNetwork.this, envelope, reply).send(); } }); @@ -106,17 +110,17 @@ public class LocalNetwork implements Network { }); } - private void receiveLater(final ReplyEnvelope envelope) { - final byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply); + private void receiveLater(ReplyEnvelope envelope) { + byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply); executor.execute(new Runnable() { @Override public void run() { - final Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); + Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); reply.setRetryDelay(envelope.reply.getRetryDelay()); reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { - final Error error = envelope.reply.getError(i); + Error error = envelope.reply.getError(i); reply.addError(new Error(error.getCode(), error.getMessage(), error.getService() != null ? error.getService() : envelope.sender.hostId)); @@ -126,7 +130,7 @@ public class LocalNetwork implements Network { }); } - private byte[] encode(final Utf8String protocolName, final Routable toEncode) { + private byte[] encode(Utf8String protocolName, Routable toEncode) { if (toEncode.getType() == 0) { return new byte[0]; } @@ -134,7 +138,7 @@ public class LocalNetwork implements Network { } @SuppressWarnings("unchecked") - private <T extends Routable> T decode(final Utf8String protocolName, final byte[] toDecode, final Class<T> clazz) { + private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) { if (toDecode.length == 0) { return clazz.cast(new EmptyReply()); } @@ -167,15 +171,14 @@ public class LocalNetwork implements Network { final Message msg; final RoutingNode recipient; - MessageEnvelope(final LocalNetwork sender, final Message msg, final RoutingNode recipient) { + MessageEnvelope(LocalNetwork sender, Message msg, RoutingNode recipient) { this.sender = sender; this.msg = msg; this.recipient = recipient; } void send() { - LocalServiceAddress.class.cast(recipient.getServiceAddress()) - .getNetwork().receiveLater(this); + LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this); } } @@ -185,7 +188,7 @@ public class LocalNetwork implements Network { final MessageEnvelope parent; final Reply reply; - ReplyEnvelope(final LocalNetwork sender, final MessageEnvelope parent, final Reply reply) { + ReplyEnvelope(LocalNetwork sender, MessageEnvelope parent, Reply reply) { this.sender = sender; this.parent = parent; this.reply = reply; @@ -195,4 +198,5 @@ public class LocalNetwork implements Network { parent.sender.receiveLater(this); } } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java index 84ca8c64bc0..5c9035a5f99 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java @@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; /** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + * @author Simon Thoresen Hult */ public class LocalWire implements IMirror { @@ -19,19 +19,19 @@ public class LocalWire implements IMirror { private final AtomicInteger updateCnt = new AtomicInteger(); private final ConcurrentHashMap<String, LocalNetwork> services = new ConcurrentHashMap<>(); - public void registerService(final String serviceName, final LocalNetwork owner) { + public void registerService(String serviceName, LocalNetwork owner) { if (services.putIfAbsent(serviceName, owner) != null) { throw new IllegalStateException(); } updateCnt.incrementAndGet(); } - public void unregisterService(final String serviceName) { + public void unregisterService(String serviceName) { services.remove(serviceName); updateCnt.incrementAndGet(); } - public LocalServiceAddress resolveServiceAddress(final String serviceName) { + public LocalServiceAddress resolveServiceAddress(String serviceName) { final LocalNetwork owner = services.get(serviceName); return owner != null ? new LocalServiceAddress(serviceName, owner) : null; } @@ -41,10 +41,10 @@ public class LocalWire implements IMirror { } @Override - public Mirror.Entry[] lookup(final String pattern) { - final List<Mirror.Entry> out = new ArrayList<>(); - final Pattern regex = Pattern.compile(pattern.replace("*", "[a-zA-Z0-9_-]+")); - for (final String key : services.keySet()) { + public Mirror.Entry[] lookup(String pattern) { + List<Mirror.Entry> out = new ArrayList<>(); + Pattern regex = Pattern.compile(pattern.replace("*", "[a-zA-Z0-9_-]+")); + for (String key : services.keySet()) { if (regex.matcher(key).matches()) { out.add(new Mirror.Entry(key, key)); } @@ -56,4 +56,5 @@ public class LocalWire implements IMirror { public int updates() { return updateCnt.get(); } + } |