summaryrefslogtreecommitdiffstats
path: root/messagebus/src
diff options
context:
space:
mode:
authorJon Bratseth <jonbratseth@yahoo.com>2016-09-29 23:28:23 +0200
committerGitHub <noreply@github.com>2016-09-29 23:28:23 +0200
commit8aeed8886002cc6ad9261da57763766cad5ca7bf (patch)
tree814bc1c016c679e67306101e7b409ce7c4d2272b /messagebus/src
parent151d1cf4c09fc24c39d665a0818c64ff7bf80716 (diff)
parent0d27136b2912ce0dd6ca1c6cc5341ebeba71dc12 (diff)
Merge pull request #738 from yahoo/bratseth/support-document-api-in-applications
Bratseth/support document api in applications
Diffstat (limited to 'messagebus/src')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java44
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java43
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java49
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/Network.java23
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java56
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java17
6 files changed, 135 insertions, 97 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 729bef7985f..cf5beb4a903 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -22,33 +22,34 @@ import java.util.logging.Logger;
* and forward messages.</p>
*
* <p>There are three types of sessions:</p>
- * <ul><li>{@link SourceSession Source sessions} sends messages and receives
- * replies</li>
- * <li>{@link IntermediateSession Intermediate sessions} receives messages on
- * their way to their final destination, and may decide to forward the messages
- * or reply directly.</li>
- * <li>{@link DestinationSession Destination sessions} are the final recipient
- * of messages, and are expected to reply to every one of them, but may not
- * forward messages.</li></ul>
+ * <ul>
+ * <li>{@link SourceSession Source sessions} sends messages and receives replies</li>
+ * <li>{@link IntermediateSession Intermediate sessions} receives messages on
+ * their way to their final destination, and may decide to forward the messages or reply directly.
+ * <li>{@link DestinationSession Destination sessions} are the final recipient
+ * of messages, and are expected to reply to every one of them, but may not forward messages.
+ * </ul>
*
* <p>A message bus is configured with a {@link Protocol protocol}. This table
* enumerates the permissible routes from intermediates to destinations and the
* messaging semantics of each hop.</p>
*
- * <p>The responsibilities of a message bus are:</p>
- * <ul> <li>Assign a route to every send message from its routing table</li>
- * <li>Deliver every message it <i>accepts</i> to the next hop on its route on a
- * best effort basis, <i>or</i> deliver a <i>failure reply</i>.</li>
- * <li>Deliver replies back to message sources through all the intermediate
- * hops.</li></ul>
+ * The responsibilities of a message bus are:
+ * <ul>
+ * <li>Assign a route to every send message from its routing table
+ * <li>Deliver every message it <i>accepts</i> to the next hop on its route
+ * <i>or</i> deliver a <i>failure reply</i>.
+ * <li>Deliver replies back to message sources through all the intermediate hops.
+ * </ul>
*
- * <p>A runtime will typically</p>
- * <ul><li>Create a message bus implementation and set properties on this
- * implementation once.</li>
- * <li>Create sessions using that message bus many places.</li></ul>
+ * A runtime will typically
+ * <ul>
+ * <li>Create a message bus implementation and set properties on this implementation once.
+ * <li>Create sessions using that message bus many places.</li>
+ * </ul>
*
- * @author btratseth
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ * @author bratseth
+ * @author Simon Thoresen
*/
public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, ReplyHandler {
@@ -101,9 +102,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
// Attach and start network.
this.net = net;
net.attach(this);
- if (!net.waitUntilReady(120)) {
+ if ( ! net.waitUntilReady(120))
throw new IllegalStateException("Network failed to become ready in time.");
- }
// Start messenger.
msn = new Messenger();
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
new file mode 100644
index 00000000000..24e177f1fbf
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
@@ -0,0 +1,43 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.messagebus.network.Network;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The combination of a messagebus and a network over which it may send data.
+ *
+ * @author bratseth
+ */
+public class NetworkMessageBus {
+
+ private final Network network;
+ private final MessageBus messageBus;
+
+ private final AtomicBoolean destroyed = new AtomicBoolean(false);
+
+ public NetworkMessageBus(Network network, MessageBus messageBus) {
+ this.network = network;
+ this.messageBus = messageBus;
+ }
+
+ /** Returns the contained message bus object */
+ public MessageBus getMessageBus() { return messageBus; }
+
+ /** Returns the network of this as a Network */
+ public Network getNetwork() { return network; }
+
+ /**
+ * Irreversibly destroys the content of this.
+ *
+ * @return whether this destroyed anything, or if it was already destroyed
+ */
+ public boolean destroy() {
+ if ( destroyed.getAndSet(true)) return false;
+
+ getMessageBus().destroy();
+ return true;
+ }
+
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
index d767e197b11..cfa50a35549 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
@@ -3,6 +3,7 @@ package com.yahoo.messagebus;
import com.yahoo.log.LogLevel;
import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -17,12 +18,9 @@ import java.util.logging.Logger;
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public class RPCMessageBus {
+public class RPCMessageBus extends NetworkMessageBus {
private static final Logger log = Logger.getLogger(RPCMessageBus.class.getName());
- private final AtomicBoolean destroyed = new AtomicBoolean(false);
- private final MessageBus mbus;
- private final RPCNetwork net;
private final ConfigAgent configAgent;
/**
@@ -33,9 +31,16 @@ public class RPCMessageBus {
* @param routingCfgId The config id for message bus routing specs.
*/
public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams, String routingCfgId) {
- net = new RPCNetwork(rpcParams);
- mbus = new MessageBus(net, mbusParams);
- configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", mbus);
+ this(mbusParams, new RPCNetwork(rpcParams), routingCfgId);
+ }
+
+ private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network, String routingCfgId) {
+ this(new MessageBus(network, mbusParams), network, routingCfgId);
+ }
+
+ private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) {
+ super(network, messageBus);
+ configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", messageBus);
configAgent.subscribe();
}
@@ -80,33 +85,17 @@ public class RPCMessageBus {
* Sets the destroyed flag to true. The very first time this method is called, it cleans up all its dependencies.
* Even if you retain a reference to this object, all of its content is allowed to be garbage collected.
*
- * @return True if content existed and was destroyed.
+ * @return true if content existed and was destroyed.
*/
+ @Override
public boolean destroy() {
- if (!destroyed.getAndSet(true)) {
+ boolean destroyed = super.destroy();
+ if (destroyed)
configAgent.shutdown();
- mbus.destroy();
- return true;
- }
- return false;
+ return destroyed;
}
- /**
- * Returns the contained message bus object.
- *
- * @return Message bus.
- */
- public MessageBus getMessageBus() {
- return mbus;
- }
-
- /**
- * Returns the contained rpc network object.
- *
- * @return RPC network.
- */
- public RPCNetwork getRPCNetwork() {
- return net;
- }
+ /** Returns the network of this as a RPCNetwork */
+ public RPCNetwork getRPCNetwork() { return (RPCNetwork)getNetwork(); }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java
index cd3b3286778..b0bbe4266c4 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java
@@ -21,28 +21,28 @@ public interface Network {
* @param seconds The timeout.
* @return True if ready.
*/
- public boolean waitUntilReady(double seconds);
+ boolean waitUntilReady(double seconds);
/**
* Attach the network layer to the given owner
*
* @param owner owner of the network
*/
- public void attach(NetworkOwner owner);
+ void attach(NetworkOwner owner);
/**
* Register a session name with the network layer. This will make the session visible to other nodes.
*
* @param session the session name
*/
- public void registerSession(String session);
+ void registerSession(String session);
/**
* Unregister a session name with the network layer. This will make the session unavailable for other nodes.
*
* @param session session name
*/
- public void unregisterSession(String session);
+ void unregisterSession(String session);
/**
* Resolves the service address of the recipient referenced by the given routing node. If a recipient can not be
@@ -52,7 +52,7 @@ public interface Network {
* @param recipient The node whose service address to allocate.
* @return True if a service address was allocated.
*/
- public boolean allocServiceAddress(RoutingNode recipient);
+ boolean allocServiceAddress(RoutingNode recipient);
/**
* Frees the service address from the given routing node. This allows the network layer to track and close
@@ -60,7 +60,7 @@ public interface Network {
*
* @param recipient The node whose service address to free.
*/
- public void freeServiceAddress(RoutingNode recipient);
+ void freeServiceAddress(RoutingNode recipient);
/**
* Send a message to the given recipients. A {@link RoutingNode} contains all the necessary context for sending.
@@ -68,7 +68,7 @@ public interface Network {
* @param msg The message to send.
* @param recipients A list of routing leaf nodes resolved for the message.
*/
- public void send(Message msg, List<RoutingNode> recipients);
+ void send(Message msg, List<RoutingNode> recipients);
/**
* Synchronize with internal threads. This method will handshake with all internal threads. This has the implicit
@@ -76,12 +76,12 @@ public interface Network {
* that would make the thread wait for itself... forever. This method is typically used to untangle during session
* shutdown.
*/
- public void sync();
+ void sync();
/**
* Shuts down the network. This is a blocking call that waits for all scheduled tasks to complete.
*/
- public void shutdown();
+ void shutdown();
/**
* Returns a string that represents the connection specs of this network. It is in not a complete address since it
@@ -89,12 +89,13 @@ public interface Network {
*
* @return The connection string.
*/
- public String getConnectionSpec();
+ String getConnectionSpec();
/**
* Returns a reference to a name server mirror.
*
* @return The mirror object.
*/
- public IMirror getMirror();
+ IMirror getMirror();
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index ffcb853a0a7..78cf352cfbf 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -23,7 +23,7 @@ import java.util.concurrent.Executors;
import static com.yahoo.messagebus.ErrorCode.NO_ADDRESS_FOR_SERVICE;
/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ * @author Simon Thoresen Hult
*/
public class LocalNetwork implements Network {
@@ -32,35 +32,39 @@ public class LocalNetwork implements Network {
private final String hostId;
private volatile NetworkOwner owner;
- public LocalNetwork(final LocalWire wire) {
+ public LocalNetwork() {
+ this(new LocalWire());
+ }
+
+ public LocalNetwork(LocalWire wire) {
this.wire = wire;
this.hostId = wire.newHostId();
}
@Override
- public boolean waitUntilReady(final double seconds) {
+ public boolean waitUntilReady(double seconds) {
return true;
}
@Override
- public void attach(final NetworkOwner owner) {
+ public void attach(NetworkOwner owner) {
this.owner = owner;
}
@Override
- public void registerSession(final String session) {
+ public void registerSession(String session) {
wire.registerService(hostId + "/" + session, this);
}
@Override
- public void unregisterSession(final String session) {
+ public void unregisterSession(String session) {
wire.unregisterService(hostId + "/" + session);
}
@Override
- public boolean allocServiceAddress(final RoutingNode recipient) {
- final String service = recipient.getRoute().getHop(0).getServiceName();
- final ServiceAddress address = wire.resolveServiceAddress(service);
+ public boolean allocServiceAddress(RoutingNode recipient) {
+ String service = recipient.getRoute().getHop(0).getServiceName();
+ ServiceAddress address = wire.resolveServiceAddress(service);
if (address == null) {
recipient.setError(new Error(NO_ADDRESS_FOR_SERVICE, "No address for service '" + service + "'."));
return false;
@@ -70,24 +74,24 @@ public class LocalNetwork implements Network {
}
@Override
- public void freeServiceAddress(final RoutingNode recipient) {
+ public void freeServiceAddress(RoutingNode recipient) {
recipient.setServiceAddress(null);
}
@Override
- public void send(final Message msg, final List<RoutingNode> recipients) {
- for (final RoutingNode recipient : recipients) {
+ public void send(Message msg, List<RoutingNode> recipients) {
+ for (RoutingNode recipient : recipients) {
new MessageEnvelope(this, msg, recipient).send();
}
}
- private void receiveLater(final MessageEnvelope envelope) {
- final byte[] payload = envelope.sender.encode(envelope.msg.getProtocol(), envelope.msg);
+ private void receiveLater(MessageEnvelope envelope) {
+ byte[] payload = envelope.sender.encode(envelope.msg.getProtocol(), envelope.msg);
executor.execute(new Runnable() {
@Override
public void run() {
- final Message msg = decode(envelope.msg.getProtocol(), payload, Message.class);
+ Message msg = decode(envelope.msg.getProtocol(), payload, Message.class);
msg.getTrace().setLevel(envelope.msg.getTrace().getLevel());
msg.setRoute(envelope.msg.getRoute()).getRoute().removeHop(0);
msg.setRetryEnabled(envelope.msg.getRetryEnabled());
@@ -96,7 +100,7 @@ public class LocalNetwork implements Network {
msg.pushHandler(new ReplyHandler() {
@Override
- public void handleReply(final Reply reply) {
+ public void handleReply(Reply reply) {
new ReplyEnvelope(LocalNetwork.this, envelope, reply).send();
}
});
@@ -106,17 +110,17 @@ public class LocalNetwork implements Network {
});
}
- private void receiveLater(final ReplyEnvelope envelope) {
- final byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply);
+ private void receiveLater(ReplyEnvelope envelope) {
+ byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply);
executor.execute(new Runnable() {
@Override
public void run() {
- final Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class);
+ Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class);
reply.setRetryDelay(envelope.reply.getRetryDelay());
reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode()));
for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) {
- final Error error = envelope.reply.getError(i);
+ Error error = envelope.reply.getError(i);
reply.addError(new Error(error.getCode(),
error.getMessage(),
error.getService() != null ? error.getService() : envelope.sender.hostId));
@@ -126,7 +130,7 @@ public class LocalNetwork implements Network {
});
}
- private byte[] encode(final Utf8String protocolName, final Routable toEncode) {
+ private byte[] encode(Utf8String protocolName, Routable toEncode) {
if (toEncode.getType() == 0) {
return new byte[0];
}
@@ -134,7 +138,7 @@ public class LocalNetwork implements Network {
}
@SuppressWarnings("unchecked")
- private <T extends Routable> T decode(final Utf8String protocolName, final byte[] toDecode, final Class<T> clazz) {
+ private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) {
if (toDecode.length == 0) {
return clazz.cast(new EmptyReply());
}
@@ -167,15 +171,14 @@ public class LocalNetwork implements Network {
final Message msg;
final RoutingNode recipient;
- MessageEnvelope(final LocalNetwork sender, final Message msg, final RoutingNode recipient) {
+ MessageEnvelope(LocalNetwork sender, Message msg, RoutingNode recipient) {
this.sender = sender;
this.msg = msg;
this.recipient = recipient;
}
void send() {
- LocalServiceAddress.class.cast(recipient.getServiceAddress())
- .getNetwork().receiveLater(this);
+ LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this);
}
}
@@ -185,7 +188,7 @@ public class LocalNetwork implements Network {
final MessageEnvelope parent;
final Reply reply;
- ReplyEnvelope(final LocalNetwork sender, final MessageEnvelope parent, final Reply reply) {
+ ReplyEnvelope(LocalNetwork sender, MessageEnvelope parent, Reply reply) {
this.sender = sender;
this.parent = parent;
this.reply = reply;
@@ -195,4 +198,5 @@ public class LocalNetwork implements Network {
parent.sender.receiveLater(this);
}
}
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java
index 84ca8c64bc0..5c9035a5f99 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java
@@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ * @author Simon Thoresen Hult
*/
public class LocalWire implements IMirror {
@@ -19,19 +19,19 @@ public class LocalWire implements IMirror {
private final AtomicInteger updateCnt = new AtomicInteger();
private final ConcurrentHashMap<String, LocalNetwork> services = new ConcurrentHashMap<>();
- public void registerService(final String serviceName, final LocalNetwork owner) {
+ public void registerService(String serviceName, LocalNetwork owner) {
if (services.putIfAbsent(serviceName, owner) != null) {
throw new IllegalStateException();
}
updateCnt.incrementAndGet();
}
- public void unregisterService(final String serviceName) {
+ public void unregisterService(String serviceName) {
services.remove(serviceName);
updateCnt.incrementAndGet();
}
- public LocalServiceAddress resolveServiceAddress(final String serviceName) {
+ public LocalServiceAddress resolveServiceAddress(String serviceName) {
final LocalNetwork owner = services.get(serviceName);
return owner != null ? new LocalServiceAddress(serviceName, owner) : null;
}
@@ -41,10 +41,10 @@ public class LocalWire implements IMirror {
}
@Override
- public Mirror.Entry[] lookup(final String pattern) {
- final List<Mirror.Entry> out = new ArrayList<>();
- final Pattern regex = Pattern.compile(pattern.replace("*", "[a-zA-Z0-9_-]+"));
- for (final String key : services.keySet()) {
+ public Mirror.Entry[] lookup(String pattern) {
+ List<Mirror.Entry> out = new ArrayList<>();
+ Pattern regex = Pattern.compile(pattern.replace("*", "[a-zA-Z0-9_-]+"));
+ for (String key : services.keySet()) {
if (regex.matcher(key).matches()) {
out.add(new Mirror.Entry(key, key));
}
@@ -56,4 +56,5 @@ public class LocalWire implements IMirror {
public int updates() {
return updateCnt.get();
}
+
}