diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-09-28 15:00:27 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-09-28 15:00:27 +0200 |
commit | f9dd06806afef90b069f4fa0c99f77beb040851d (patch) | |
tree | ecce12a3af04d80e5d39b0ebce919aa5c0fef81b /messagebus/src/main/java/com | |
parent | 321194bbe4a36d100d92188644d90c6cf4d373c7 (diff) |
Support document-api in application
Diffstat (limited to 'messagebus/src/main/java/com')
4 files changed, 69 insertions, 67 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 729bef7985f..cf5beb4a903 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -22,33 +22,34 @@ import java.util.logging.Logger; * and forward messages.</p> * * <p>There are three types of sessions:</p> - * <ul><li>{@link SourceSession Source sessions} sends messages and receives - * replies</li> - * <li>{@link IntermediateSession Intermediate sessions} receives messages on - * their way to their final destination, and may decide to forward the messages - * or reply directly.</li> - * <li>{@link DestinationSession Destination sessions} are the final recipient - * of messages, and are expected to reply to every one of them, but may not - * forward messages.</li></ul> + * <ul> + * <li>{@link SourceSession Source sessions} sends messages and receives replies</li> + * <li>{@link IntermediateSession Intermediate sessions} receives messages on + * their way to their final destination, and may decide to forward the messages or reply directly. + * <li>{@link DestinationSession Destination sessions} are the final recipient + * of messages, and are expected to reply to every one of them, but may not forward messages. + * </ul> * * <p>A message bus is configured with a {@link Protocol protocol}. This table * enumerates the permissible routes from intermediates to destinations and the * messaging semantics of each hop.</p> * - * <p>The responsibilities of a message bus are:</p> - * <ul> <li>Assign a route to every send message from its routing table</li> - * <li>Deliver every message it <i>accepts</i> to the next hop on its route on a - * best effort basis, <i>or</i> deliver a <i>failure reply</i>.</li> - * <li>Deliver replies back to message sources through all the intermediate - * hops.</li></ul> + * The responsibilities of a message bus are: + * <ul> + * <li>Assign a route to every send message from its routing table + * <li>Deliver every message it <i>accepts</i> to the next hop on its route + * <i>or</i> deliver a <i>failure reply</i>. + * <li>Deliver replies back to message sources through all the intermediate hops. + * </ul> * - * <p>A runtime will typically</p> - * <ul><li>Create a message bus implementation and set properties on this - * implementation once.</li> - * <li>Create sessions using that message bus many places.</li></ul> + * A runtime will typically + * <ul> + * <li>Create a message bus implementation and set properties on this implementation once. + * <li>Create sessions using that message bus many places.</li> + * </ul> * - * @author btratseth - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + * @author bratseth + * @author Simon Thoresen */ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, ReplyHandler { @@ -101,9 +102,8 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, // Attach and start network. this.net = net; net.attach(this); - if (!net.waitUntilReady(120)) { + if ( ! net.waitUntilReady(120)) throw new IllegalStateException("Network failed to become ready in time."); - } // Start messenger. msn = new Messenger(); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java index cd3b3286778..b0bbe4266c4 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java @@ -21,28 +21,28 @@ public interface Network { * @param seconds The timeout. * @return True if ready. */ - public boolean waitUntilReady(double seconds); + boolean waitUntilReady(double seconds); /** * Attach the network layer to the given owner * * @param owner owner of the network */ - public void attach(NetworkOwner owner); + void attach(NetworkOwner owner); /** * Register a session name with the network layer. This will make the session visible to other nodes. * * @param session the session name */ - public void registerSession(String session); + void registerSession(String session); /** * Unregister a session name with the network layer. This will make the session unavailable for other nodes. * * @param session session name */ - public void unregisterSession(String session); + void unregisterSession(String session); /** * Resolves the service address of the recipient referenced by the given routing node. If a recipient can not be @@ -52,7 +52,7 @@ public interface Network { * @param recipient The node whose service address to allocate. * @return True if a service address was allocated. */ - public boolean allocServiceAddress(RoutingNode recipient); + boolean allocServiceAddress(RoutingNode recipient); /** * Frees the service address from the given routing node. This allows the network layer to track and close @@ -60,7 +60,7 @@ public interface Network { * * @param recipient The node whose service address to free. */ - public void freeServiceAddress(RoutingNode recipient); + void freeServiceAddress(RoutingNode recipient); /** * Send a message to the given recipients. A {@link RoutingNode} contains all the necessary context for sending. @@ -68,7 +68,7 @@ public interface Network { * @param msg The message to send. * @param recipients A list of routing leaf nodes resolved for the message. */ - public void send(Message msg, List<RoutingNode> recipients); + void send(Message msg, List<RoutingNode> recipients); /** * Synchronize with internal threads. This method will handshake with all internal threads. This has the implicit @@ -76,12 +76,12 @@ public interface Network { * that would make the thread wait for itself... forever. This method is typically used to untangle during session * shutdown. */ - public void sync(); + void sync(); /** * Shuts down the network. This is a blocking call that waits for all scheduled tasks to complete. */ - public void shutdown(); + void shutdown(); /** * Returns a string that represents the connection specs of this network. It is in not a complete address since it @@ -89,12 +89,13 @@ public interface Network { * * @return The connection string. */ - public String getConnectionSpec(); + String getConnectionSpec(); /** * Returns a reference to a name server mirror. * * @return The mirror object. */ - public IMirror getMirror(); + IMirror getMirror(); + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index ffcb853a0a7..bbbb49e4370 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -23,7 +23,7 @@ import java.util.concurrent.Executors; import static com.yahoo.messagebus.ErrorCode.NO_ADDRESS_FOR_SERVICE; /** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + * @author Simon Thoresen Hult */ public class LocalNetwork implements Network { @@ -32,35 +32,35 @@ public class LocalNetwork implements Network { private final String hostId; private volatile NetworkOwner owner; - public LocalNetwork(final LocalWire wire) { + public LocalNetwork(LocalWire wire) { this.wire = wire; this.hostId = wire.newHostId(); } @Override - public boolean waitUntilReady(final double seconds) { + public boolean waitUntilReady(double seconds) { return true; } @Override - public void attach(final NetworkOwner owner) { + public void attach(NetworkOwner owner) { this.owner = owner; } @Override - public void registerSession(final String session) { + public void registerSession(String session) { wire.registerService(hostId + "/" + session, this); } @Override - public void unregisterSession(final String session) { + public void unregisterSession(String session) { wire.unregisterService(hostId + "/" + session); } @Override - public boolean allocServiceAddress(final RoutingNode recipient) { - final String service = recipient.getRoute().getHop(0).getServiceName(); - final ServiceAddress address = wire.resolveServiceAddress(service); + public boolean allocServiceAddress(RoutingNode recipient) { + String service = recipient.getRoute().getHop(0).getServiceName(); + ServiceAddress address = wire.resolveServiceAddress(service); if (address == null) { recipient.setError(new Error(NO_ADDRESS_FOR_SERVICE, "No address for service '" + service + "'.")); return false; @@ -70,24 +70,24 @@ public class LocalNetwork implements Network { } @Override - public void freeServiceAddress(final RoutingNode recipient) { + public void freeServiceAddress(RoutingNode recipient) { recipient.setServiceAddress(null); } @Override - public void send(final Message msg, final List<RoutingNode> recipients) { - for (final RoutingNode recipient : recipients) { + public void send(Message msg, List<RoutingNode> recipients) { + for (RoutingNode recipient : recipients) { new MessageEnvelope(this, msg, recipient).send(); } } - private void receiveLater(final MessageEnvelope envelope) { - final byte[] payload = envelope.sender.encode(envelope.msg.getProtocol(), envelope.msg); + private void receiveLater(MessageEnvelope envelope) { + byte[] payload = envelope.sender.encode(envelope.msg.getProtocol(), envelope.msg); executor.execute(new Runnable() { @Override public void run() { - final Message msg = decode(envelope.msg.getProtocol(), payload, Message.class); + Message msg = decode(envelope.msg.getProtocol(), payload, Message.class); msg.getTrace().setLevel(envelope.msg.getTrace().getLevel()); msg.setRoute(envelope.msg.getRoute()).getRoute().removeHop(0); msg.setRetryEnabled(envelope.msg.getRetryEnabled()); @@ -96,7 +96,7 @@ public class LocalNetwork implements Network { msg.pushHandler(new ReplyHandler() { @Override - public void handleReply(final Reply reply) { + public void handleReply(Reply reply) { new ReplyEnvelope(LocalNetwork.this, envelope, reply).send(); } }); @@ -106,17 +106,17 @@ public class LocalNetwork implements Network { }); } - private void receiveLater(final ReplyEnvelope envelope) { - final byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply); + private void receiveLater(ReplyEnvelope envelope) { + byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply); executor.execute(new Runnable() { @Override public void run() { - final Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); + Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); reply.setRetryDelay(envelope.reply.getRetryDelay()); reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { - final Error error = envelope.reply.getError(i); + Error error = envelope.reply.getError(i); reply.addError(new Error(error.getCode(), error.getMessage(), error.getService() != null ? error.getService() : envelope.sender.hostId)); @@ -126,7 +126,7 @@ public class LocalNetwork implements Network { }); } - private byte[] encode(final Utf8String protocolName, final Routable toEncode) { + private byte[] encode(Utf8String protocolName, Routable toEncode) { if (toEncode.getType() == 0) { return new byte[0]; } @@ -134,7 +134,7 @@ public class LocalNetwork implements Network { } @SuppressWarnings("unchecked") - private <T extends Routable> T decode(final Utf8String protocolName, final byte[] toDecode, final Class<T> clazz) { + private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) { if (toDecode.length == 0) { return clazz.cast(new EmptyReply()); } @@ -167,15 +167,14 @@ public class LocalNetwork implements Network { final Message msg; final RoutingNode recipient; - MessageEnvelope(final LocalNetwork sender, final Message msg, final RoutingNode recipient) { + MessageEnvelope(LocalNetwork sender, Message msg, RoutingNode recipient) { this.sender = sender; this.msg = msg; this.recipient = recipient; } void send() { - LocalServiceAddress.class.cast(recipient.getServiceAddress()) - .getNetwork().receiveLater(this); + LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this); } } @@ -185,7 +184,7 @@ public class LocalNetwork implements Network { final MessageEnvelope parent; final Reply reply; - ReplyEnvelope(final LocalNetwork sender, final MessageEnvelope parent, final Reply reply) { + ReplyEnvelope(LocalNetwork sender, MessageEnvelope parent, Reply reply) { this.sender = sender; this.parent = parent; this.reply = reply; @@ -195,4 +194,5 @@ public class LocalNetwork implements Network { parent.sender.receiveLater(this); } } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java index 84ca8c64bc0..5c9035a5f99 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalWire.java @@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; /** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + * @author Simon Thoresen Hult */ public class LocalWire implements IMirror { @@ -19,19 +19,19 @@ public class LocalWire implements IMirror { private final AtomicInteger updateCnt = new AtomicInteger(); private final ConcurrentHashMap<String, LocalNetwork> services = new ConcurrentHashMap<>(); - public void registerService(final String serviceName, final LocalNetwork owner) { + public void registerService(String serviceName, LocalNetwork owner) { if (services.putIfAbsent(serviceName, owner) != null) { throw new IllegalStateException(); } updateCnt.incrementAndGet(); } - public void unregisterService(final String serviceName) { + public void unregisterService(String serviceName) { services.remove(serviceName); updateCnt.incrementAndGet(); } - public LocalServiceAddress resolveServiceAddress(final String serviceName) { + public LocalServiceAddress resolveServiceAddress(String serviceName) { final LocalNetwork owner = services.get(serviceName); return owner != null ? new LocalServiceAddress(serviceName, owner) : null; } @@ -41,10 +41,10 @@ public class LocalWire implements IMirror { } @Override - public Mirror.Entry[] lookup(final String pattern) { - final List<Mirror.Entry> out = new ArrayList<>(); - final Pattern regex = Pattern.compile(pattern.replace("*", "[a-zA-Z0-9_-]+")); - for (final String key : services.keySet()) { + public Mirror.Entry[] lookup(String pattern) { + List<Mirror.Entry> out = new ArrayList<>(); + Pattern regex = Pattern.compile(pattern.replace("*", "[a-zA-Z0-9_-]+")); + for (String key : services.keySet()) { if (regex.matcher(key).matches()) { out.add(new Mirror.Entry(key, key)); } @@ -56,4 +56,5 @@ public class LocalWire implements IMirror { public int updates() { return updateCnt.get(); } + } |