diff options
5 files changed, 71 insertions, 87 deletions
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java index 71cbc422d9b..bf89f3869ed 100644 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java +++ b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java @@ -581,17 +581,17 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest { new DestinationSessionParams()); } - TestRunner(final MessageBusParams mbusParams, final DestinationSessionParams sessionParams) { + TestRunner(MessageBusParams mbusParams, DestinationSessionParams sessionParams) { this.mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(wire), mbusParams)); this.session = mbus.newDestinationSession(sessionParams); } - TestRunner setRequestTimeout(final long timeout, final TimeUnit unit) { + TestRunner setRequestTimeout(long timeout, TimeUnit unit) { timeoutMillis = unit.toMillis(timeout); return this; } - TestRunner expectError(final Matcher<Integer> matcher) { + TestRunner expectError(Matcher<Integer> matcher) { assertThat(successExpected, is(false)); expectedError = matcher; return this; @@ -620,12 +620,12 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest { } @Override - public MyClient newClient(final MbusServer server) throws Throwable { + public MyClient newClient(MbusServer server) throws Throwable { return new MyClient(wire, server.connectionSpec()); } @Override - public Reply executeRequest(final MyClient client, final boolean withRequestContent) throws Throwable { + public Reply executeRequest(MyClient client, boolean withRequestContent) throws Throwable { // This protocol doesn't have the concept of "request content", so if we are asked to send any, it's a bug. assertThat(withRequestContent, is(false)); @@ -647,7 +647,7 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest { } @Override - public void validateResponse(final Reply reply) throws Throwable { + public void validateResponse(Reply reply) throws Throwable { final String trace = String.valueOf(reply.getTrace()); if (expectedError != null) { assertThat(reply.hasErrors(), is(true)); @@ -673,7 +673,7 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest { final Route route; final SourceSession session; - MyClient(final LocalWire wire, final String connectionSpec) { + MyClient(LocalWire wire, String connectionSpec) { this.mbus = new MessageBus(new LocalNetwork(wire), new MessageBusParams().addProtocol(new SimpleProtocol())); this.session = mbus.createSourceSession(new SourceSessionParams().setReplyHandler(this)); @@ -687,7 +687,7 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest { } @Override - public void handleReply(final Reply reply) { + public void handleReply(Reply reply) { replies.addLast(reply); } } diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java index 3b063d148e6..ee3c5c993db 100644 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java +++ b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java @@ -8,6 +8,8 @@ import com.yahoo.messagebus.jdisc.test.MessageQueue; import com.yahoo.messagebus.jdisc.test.RemoteClient; import com.yahoo.messagebus.jdisc.test.RemoteServer; import com.yahoo.messagebus.jdisc.test.ReplyQueue; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.test.SimpleMessage; @@ -26,7 +28,7 @@ public class SharedIntermediateSessionTestCase { @Test public void requireThatMessageHandlerCanBeAccessed() { - SharedIntermediateSession session = newIntermediateSession(); + SharedIntermediateSession session = newIntermediateSession(false); assertNull(session.getMessageHandler()); MessageQueue handler = new MessageQueue(); @@ -36,7 +38,7 @@ public class SharedIntermediateSessionTestCase { @Test public void requireThatMessageHandlerCanOnlyBeSetOnce() { - SharedIntermediateSession session = newIntermediateSession(); + SharedIntermediateSession session = newIntermediateSession(false); session.setMessageHandler(new MessageQueue()); try { session.setMessageHandler(new MessageQueue()); @@ -49,7 +51,7 @@ public class SharedIntermediateSessionTestCase { @Test public void requireThatMessageHandlerIsCalled() throws InterruptedException { - SharedIntermediateSession session = newIntermediateSession(); + SharedIntermediateSession session = newIntermediateSession(false); MessageQueue queue = new MessageQueue(); session.setMessageHandler(queue); session.handleMessage(new SimpleMessage("foo")); @@ -59,7 +61,7 @@ public class SharedIntermediateSessionTestCase { @Test public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException { - SharedIntermediateSession session = newIntermediateSession(); + SharedIntermediateSession session = newIntermediateSession(false); Message msg = new SimpleMessage("foo"); ReplyQueue queue = new ReplyQueue(); msg.pushHandler(queue); @@ -76,7 +78,8 @@ public class SharedIntermediateSessionTestCase { Slobrok slobrok = new Slobrok(); try { newIntermediateSession(slobrok.configId(), - new IntermediateSessionParams().setReplyHandler(new ReplyQueue())); + new IntermediateSessionParams().setReplyHandler(new ReplyQueue()), + false); fail(); } catch (IllegalArgumentException e) { assertEquals("Reply handler must be null.", e.getMessage()); @@ -85,21 +88,20 @@ public class SharedIntermediateSessionTestCase { @Test public void requireThatSessionIsClosedOnDestroy() { - SharedIntermediateSession session = newIntermediateSession(); + SharedIntermediateSession session = newIntermediateSession(false); session.release(); assertFalse("IntermediateSession not destroyed by release().", session.session().destroy()); } @Test public void requireThatMbusIsReleasedOnDestroy() { - Slobrok slobrok = null; try { - slobrok = new Slobrok(); + new Slobrok(); } catch (ListenFailedException e) { fail(); } - RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId()); - SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams); + SharedMessageBus mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), new MessageBusParams())); + SharedIntermediateSession session = mbus.newIntermediateSession(new IntermediateSessionParams()); mbus.release(); session.release(); @@ -110,7 +112,8 @@ public class SharedIntermediateSessionTestCase { public void requireThatSessionCanSendMessage() throws InterruptedException { RemoteServer server = RemoteServer.newInstanceWithInternSlobrok(); SharedIntermediateSession session = newIntermediateSession(server.slobrokId(), - new IntermediateSessionParams()); + new IntermediateSessionParams(), + true); ReplyQueue queue = new ReplyQueue(); Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec())); msg.setTimeReceivedNow(); @@ -131,7 +134,7 @@ public class SharedIntermediateSessionTestCase { RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(); MessageQueue queue = new MessageQueue(); IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue); - SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params); + SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true); Route route = Route.parse(session.connectionSpec()); assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted()); @@ -146,20 +149,24 @@ public class SharedIntermediateSessionTestCase { client.close(); } - private static SharedIntermediateSession newIntermediateSession() { + private static SharedIntermediateSession newIntermediateSession(boolean network) { Slobrok slobrok = null; try { slobrok = new Slobrok(); } catch (ListenFailedException e) { fail(); } - return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams()); + return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network); } - private static SharedIntermediateSession newIntermediateSession(String slobrokId, IntermediateSessionParams params) { + private static SharedIntermediateSession newIntermediateSession(String slobrokId, + IntermediateSessionParams params, + boolean network) { RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId); MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol()); - SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + SharedMessageBus mbus = network + ? SharedMessageBus.newInstance(mbusParams, netParams) + : new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), mbusParams)); SharedIntermediateSession session = mbus.newIntermediateSession(params); mbus.release(); return session; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java index 3c577fddcc2..095ff06b033 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java @@ -9,22 +9,22 @@ import java.util.List; /** * This interface separates the low-level network implementation from the rest of messagebus. The methods defined in - * this interface is intended to be invoked by MessageBus and not by the application. + * this interface are intended to be invoked by MessageBus and not by the application. * - * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a> + * @author havardpe */ public interface Network { /** * Waits for at most the given number of seconds for all dependencies to become ready. * - * @param seconds The timeout. - * @return True if ready. + * @param seconds the timeout + * @return true if ready */ boolean waitUntilReady(double seconds); /** - * Attach the network layer to the given owner + * Attach the network layer to the given owner. * * @param owner owner of the network */ @@ -49,8 +49,8 @@ public interface Network { * resolved, this method tags the node with an error. If this method succeeds, you need to invoke {@link * #freeServiceAddress(RoutingNode)} once you are done with the service address. * - * @param recipient The node whose service address to allocate. - * @return True if a service address was allocated. + * @param recipient the node whose service address to allocate + * @return true if a service address was allocated */ boolean allocServiceAddress(RoutingNode recipient); @@ -58,15 +58,15 @@ public interface Network { * Frees the service address from the given routing node. This allows the network layer to track and close * connections as required. * - * @param recipient The node whose service address to free. + * @param recipient the node whose service address to free */ void freeServiceAddress(RoutingNode recipient); /** * Send a message to the given recipients. A {@link RoutingNode} contains all the necessary context for sending. * - * @param msg The message to send. - * @param recipients A list of routing leaf nodes resolved for the message. + * @param msg the message to send + * @param recipients a list of routing leaf nodes resolved for the message */ void send(Message msg, List<RoutingNode> recipients); @@ -78,24 +78,16 @@ public interface Network { */ void sync(); - /** - * Shuts down the network. This is a blocking call that waits for all scheduled tasks to complete. - */ + /** Shuts down the network. This is a blocking call that waits for all scheduled tasks to complete. */ void shutdown(); /** - * Returns a string that represents the connection specs of this network. It is in not a complete address since it - * know nothing of the sessions that run on it. - * - * @return The connection string. + * Returns a string that represents the connection specs of this network. + * It is in not a complete address since it know nothing of the sessions that run on it. */ String getConnectionSpec(); - /** - * Returns a reference to a name server mirror. - * - * @return The mirror object. - */ + /** Returns a reference to a name server mirror. */ IMirror getMirror(); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index 5a7ca9c6f1a..b0724ad6029 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -47,7 +47,7 @@ import java.util.stream.Collectors; /** * An RPC implementation of the Network interface. * - * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a> + * @author havardpe */ public class RPCNetwork implements Network, MethodHandler { @@ -81,7 +81,7 @@ public class RPCNetwork implements Network, MethodHandler { * prefix is 'a/b' and the session name is 'c', the resulting service name that identifies the session on the * message bus will be 'a/b/c' * - * @param params A complete set of parameters. + * @param params a complete set of parameters * @param slobrokConfig subscriber for slobroks config */ public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) { @@ -115,7 +115,7 @@ public class RPCNetwork implements Network, MethodHandler { * prefix is 'a/b' and the session name is 'c', the resulting service name that identifies the session on the * message bus will be 'a/b/c' * - * @param params A complete set of parameters. + * @param params a complete set of parameters */ public RPCNetwork(RPCNetworkParams params) { this(params, params.getSlobroksConfig() != null ? new SlobrokConfigSubscriber(params.getSlobroksConfig()) @@ -249,7 +249,7 @@ public class RPCNetwork implements Network, MethodHandler { * resolved. If all versions were resolved ahead of time, this method is invoked by the same thread as the former. * If not, this method is invoked by the network thread during the version callback. * - * @param ctx All the required send-data. + * @param ctx all the required send-data */ private void send(SendContext ctx) { if (destroyed.get()) { @@ -267,7 +267,7 @@ public class RPCNetwork implements Network, MethodHandler { * Sets the destroyed flag to true. The very first time this method is called, it cleans up all its dependencies. * Even if you retain a reference to this object, all of its content is allowed to be garbage collected. * - * @return True if content existed and was destroyed. + * @return true if content existed and was destroyed */ public boolean destroy() { if (!destroyed.getAndSet(true)) { @@ -290,7 +290,7 @@ public class RPCNetwork implements Network, MethodHandler { * network, and is separated into its own function so that unit tests can override it to simulate other versions * than current. * - * @return The version to claim to be. + * @return the version to claim to be */ protected Version getVersion() { return Vtag.currentVersion; @@ -301,9 +301,9 @@ public class RPCNetwork implements Network, MethodHandler { * {@link #allocServiceAddress(RoutingNode)} method. The target allocated here is released when the routing node * calls {@link #freeServiceAddress(RoutingNode)}. * - * @param recipient The recipient to assign the service address to. - * @param serviceName The name of the service to resolve. - * @return Any error encountered, or null. + * @param recipient the recipient to assign the service address to + * @param serviceName the name of the service to resolve + * @return any error encountered, or null */ public Error resolveServiceAddress(RoutingNode recipient, String serviceName) { RPCServiceAddress ret = servicePool.resolve(serviceName); @@ -329,8 +329,8 @@ public class RPCNetwork implements Network, MethodHandler { * Registers a send adapter for a given version. This will overwrite whatever is already registered under the same * version. * - * @param version The version for which to register an adapter. - * @param adapter The adapter to register. + * @param version the version for which to register an adapter + * @param adapter the adapter to register */ private void addSendAdapter(Version version, RPCSendAdapter adapter) { adapter.attach(this); @@ -341,8 +341,8 @@ public class RPCNetwork implements Network, MethodHandler { * Determines and returns the send adapter that is compatible with the given version. If no adapter can be found, * this method returns null. * - * @param version The version for which to return an adapter. - * @return The compatible adapter. + * @param version the version for which to return an adapter + * @return the compatible adapter */ public RPCSendAdapter getSendAdapter(Version version) { Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version); @@ -352,9 +352,9 @@ public class RPCNetwork implements Network, MethodHandler { /** * Deliver an error reply to the recipients of a {@link SendContext} in a way that avoids entanglement. * - * @param ctx The send context that contains the recipient data. - * @param errCode The error code to return. - * @param errMsg The error string to return. + * @param ctx the send context that contains the recipient data + * @param errCode the error code to return + * @param errMsg the error string to return */ private void replyError(SendContext ctx, int errCode, String errMsg) { for (RoutingNode recipient : ctx.recipients) { @@ -365,38 +365,22 @@ public class RPCNetwork implements Network, MethodHandler { } } - /** - * Get the owner of this network - * - * @return network owner - */ + /** Returns the owner of this network. */ NetworkOwner getOwner() { return owner; } - /** - * Returns the identity of this network. - * - * @return The identity. - */ + /** Returns the identity of this network. */ public Identity getIdentity() { return identity; } - /** - * Obtain the port number this network listens to - * - * @return listening port number - */ + /** Returns the port number this network listens to. */ public int getPort() { return listener.port(); } - /** - * Returns the JRT supervisor. - * - * @return The supervisor. - */ + /** Returns the JRT supervisor. */ Supervisor getSupervisor() { return orb; } @@ -492,8 +476,8 @@ public class RPCNetwork implements Network, MethodHandler { } /** - * Implements a helper class to invoke {@link RPCTargetPool#flushTargets(boolean)} once every second. This is to - * unentangle the target pool from the scheduler. + * Implements a helper class to invoke {@link RPCTargetPool#flushTargets(boolean)} once every second. + * This is to untangle the target pool from the scheduler. */ private static class TargetPoolTask implements Runnable { @@ -512,4 +496,5 @@ public class RPCNetwork implements Network, MethodHandler { jrtTask.schedule(1.0); } } + } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java index aec38e7d157..c4bcaa488e2 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java @@ -164,7 +164,7 @@ public class LocalNetworkTest { final BlockingDeque<Message> messages = new LinkedBlockingDeque<>(); final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); - Server(final LocalWire wire) { + Server(LocalWire wire) { mbus = new MessageBus(new LocalNetwork(wire), new MessageBusParams().addProtocol(new SimpleProtocol()) .setRetryPolicy(null)); @@ -193,12 +193,12 @@ public class LocalNetworkTest { } @Override - public void handleMessage(final Message msg) { + public void handleMessage(Message msg) { messages.addLast(msg); } @Override - public void handleReply(final Reply reply) { + public void handleReply(Reply reply) { replies.addLast(reply); } } |