summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java16
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java39
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/Network.java36
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java61
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java6
5 files changed, 71 insertions, 87 deletions
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
index 71cbc422d9b..bf89f3869ed 100644
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
+++ b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
@@ -581,17 +581,17 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest {
new DestinationSessionParams());
}
- TestRunner(final MessageBusParams mbusParams, final DestinationSessionParams sessionParams) {
+ TestRunner(MessageBusParams mbusParams, DestinationSessionParams sessionParams) {
this.mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(wire), mbusParams));
this.session = mbus.newDestinationSession(sessionParams);
}
- TestRunner setRequestTimeout(final long timeout, final TimeUnit unit) {
+ TestRunner setRequestTimeout(long timeout, TimeUnit unit) {
timeoutMillis = unit.toMillis(timeout);
return this;
}
- TestRunner expectError(final Matcher<Integer> matcher) {
+ TestRunner expectError(Matcher<Integer> matcher) {
assertThat(successExpected, is(false));
expectedError = matcher;
return this;
@@ -620,12 +620,12 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest {
}
@Override
- public MyClient newClient(final MbusServer server) throws Throwable {
+ public MyClient newClient(MbusServer server) throws Throwable {
return new MyClient(wire, server.connectionSpec());
}
@Override
- public Reply executeRequest(final MyClient client, final boolean withRequestContent) throws Throwable {
+ public Reply executeRequest(MyClient client, boolean withRequestContent) throws Throwable {
// This protocol doesn't have the concept of "request content", so if we are asked to send any, it's a bug.
assertThat(withRequestContent, is(false));
@@ -647,7 +647,7 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest {
}
@Override
- public void validateResponse(final Reply reply) throws Throwable {
+ public void validateResponse(Reply reply) throws Throwable {
final String trace = String.valueOf(reply.getTrace());
if (expectedError != null) {
assertThat(reply.hasErrors(), is(true));
@@ -673,7 +673,7 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest {
final Route route;
final SourceSession session;
- MyClient(final LocalWire wire, final String connectionSpec) {
+ MyClient(LocalWire wire, String connectionSpec) {
this.mbus = new MessageBus(new LocalNetwork(wire),
new MessageBusParams().addProtocol(new SimpleProtocol()));
this.session = mbus.createSourceSession(new SourceSessionParams().setReplyHandler(this));
@@ -687,7 +687,7 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest {
}
@Override
- public void handleReply(final Reply reply) {
+ public void handleReply(Reply reply) {
replies.addLast(reply);
}
}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
index 3b063d148e6..ee3c5c993db 100644
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
+++ b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
@@ -8,6 +8,8 @@ import com.yahoo.messagebus.jdisc.test.MessageQueue;
import com.yahoo.messagebus.jdisc.test.RemoteClient;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.network.local.LocalNetwork;
+import com.yahoo.messagebus.network.local.LocalWire;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -26,7 +28,7 @@ public class SharedIntermediateSessionTestCase {
@Test
public void requireThatMessageHandlerCanBeAccessed() {
- SharedIntermediateSession session = newIntermediateSession();
+ SharedIntermediateSession session = newIntermediateSession(false);
assertNull(session.getMessageHandler());
MessageQueue handler = new MessageQueue();
@@ -36,7 +38,7 @@ public class SharedIntermediateSessionTestCase {
@Test
public void requireThatMessageHandlerCanOnlyBeSetOnce() {
- SharedIntermediateSession session = newIntermediateSession();
+ SharedIntermediateSession session = newIntermediateSession(false);
session.setMessageHandler(new MessageQueue());
try {
session.setMessageHandler(new MessageQueue());
@@ -49,7 +51,7 @@ public class SharedIntermediateSessionTestCase {
@Test
public void requireThatMessageHandlerIsCalled() throws InterruptedException {
- SharedIntermediateSession session = newIntermediateSession();
+ SharedIntermediateSession session = newIntermediateSession(false);
MessageQueue queue = new MessageQueue();
session.setMessageHandler(queue);
session.handleMessage(new SimpleMessage("foo"));
@@ -59,7 +61,7 @@ public class SharedIntermediateSessionTestCase {
@Test
public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException {
- SharedIntermediateSession session = newIntermediateSession();
+ SharedIntermediateSession session = newIntermediateSession(false);
Message msg = new SimpleMessage("foo");
ReplyQueue queue = new ReplyQueue();
msg.pushHandler(queue);
@@ -76,7 +78,8 @@ public class SharedIntermediateSessionTestCase {
Slobrok slobrok = new Slobrok();
try {
newIntermediateSession(slobrok.configId(),
- new IntermediateSessionParams().setReplyHandler(new ReplyQueue()));
+ new IntermediateSessionParams().setReplyHandler(new ReplyQueue()),
+ false);
fail();
} catch (IllegalArgumentException e) {
assertEquals("Reply handler must be null.", e.getMessage());
@@ -85,21 +88,20 @@ public class SharedIntermediateSessionTestCase {
@Test
public void requireThatSessionIsClosedOnDestroy() {
- SharedIntermediateSession session = newIntermediateSession();
+ SharedIntermediateSession session = newIntermediateSession(false);
session.release();
assertFalse("IntermediateSession not destroyed by release().", session.session().destroy());
}
@Test
public void requireThatMbusIsReleasedOnDestroy() {
- Slobrok slobrok = null;
try {
- slobrok = new Slobrok();
+ new Slobrok();
} catch (ListenFailedException e) {
fail();
}
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId());
- SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams);
+ SharedMessageBus mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), new MessageBusParams()));
+
SharedIntermediateSession session = mbus.newIntermediateSession(new IntermediateSessionParams());
mbus.release();
session.release();
@@ -110,7 +112,8 @@ public class SharedIntermediateSessionTestCase {
public void requireThatSessionCanSendMessage() throws InterruptedException {
RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
SharedIntermediateSession session = newIntermediateSession(server.slobrokId(),
- new IntermediateSessionParams());
+ new IntermediateSessionParams(),
+ true);
ReplyQueue queue = new ReplyQueue();
Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
msg.setTimeReceivedNow();
@@ -131,7 +134,7 @@ public class SharedIntermediateSessionTestCase {
RemoteClient client = RemoteClient.newInstanceWithInternSlobrok();
MessageQueue queue = new MessageQueue();
IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue);
- SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params);
+ SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true);
Route route = Route.parse(session.connectionSpec());
assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
@@ -146,20 +149,24 @@ public class SharedIntermediateSessionTestCase {
client.close();
}
- private static SharedIntermediateSession newIntermediateSession() {
+ private static SharedIntermediateSession newIntermediateSession(boolean network) {
Slobrok slobrok = null;
try {
slobrok = new Slobrok();
} catch (ListenFailedException e) {
fail();
}
- return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams());
+ return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network);
}
- private static SharedIntermediateSession newIntermediateSession(String slobrokId, IntermediateSessionParams params) {
+ private static SharedIntermediateSession newIntermediateSession(String slobrokId,
+ IntermediateSessionParams params,
+ boolean network) {
RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
- SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ SharedMessageBus mbus = network
+ ? SharedMessageBus.newInstance(mbusParams, netParams)
+ : new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), mbusParams));
SharedIntermediateSession session = mbus.newIntermediateSession(params);
mbus.release();
return session;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java
index 3c577fddcc2..095ff06b033 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/Network.java
@@ -9,22 +9,22 @@ import java.util.List;
/**
* This interface separates the low-level network implementation from the rest of messagebus. The methods defined in
- * this interface is intended to be invoked by MessageBus and not by the application.
+ * this interface are intended to be invoked by MessageBus and not by the application.
*
- * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ * @author havardpe
*/
public interface Network {
/**
* Waits for at most the given number of seconds for all dependencies to become ready.
*
- * @param seconds The timeout.
- * @return True if ready.
+ * @param seconds the timeout
+ * @return true if ready
*/
boolean waitUntilReady(double seconds);
/**
- * Attach the network layer to the given owner
+ * Attach the network layer to the given owner.
*
* @param owner owner of the network
*/
@@ -49,8 +49,8 @@ public interface Network {
* resolved, this method tags the node with an error. If this method succeeds, you need to invoke {@link
* #freeServiceAddress(RoutingNode)} once you are done with the service address.
*
- * @param recipient The node whose service address to allocate.
- * @return True if a service address was allocated.
+ * @param recipient the node whose service address to allocate
+ * @return true if a service address was allocated
*/
boolean allocServiceAddress(RoutingNode recipient);
@@ -58,15 +58,15 @@ public interface Network {
* Frees the service address from the given routing node. This allows the network layer to track and close
* connections as required.
*
- * @param recipient The node whose service address to free.
+ * @param recipient the node whose service address to free
*/
void freeServiceAddress(RoutingNode recipient);
/**
* Send a message to the given recipients. A {@link RoutingNode} contains all the necessary context for sending.
*
- * @param msg The message to send.
- * @param recipients A list of routing leaf nodes resolved for the message.
+ * @param msg the message to send
+ * @param recipients a list of routing leaf nodes resolved for the message
*/
void send(Message msg, List<RoutingNode> recipients);
@@ -78,24 +78,16 @@ public interface Network {
*/
void sync();
- /**
- * Shuts down the network. This is a blocking call that waits for all scheduled tasks to complete.
- */
+ /** Shuts down the network. This is a blocking call that waits for all scheduled tasks to complete. */
void shutdown();
/**
- * Returns a string that represents the connection specs of this network. It is in not a complete address since it
- * know nothing of the sessions that run on it.
- *
- * @return The connection string.
+ * Returns a string that represents the connection specs of this network.
+ * It is in not a complete address since it know nothing of the sessions that run on it.
*/
String getConnectionSpec();
- /**
- * Returns a reference to a name server mirror.
- *
- * @return The mirror object.
- */
+ /** Returns a reference to a name server mirror. */
IMirror getMirror();
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 5a7ca9c6f1a..b0724ad6029 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -47,7 +47,7 @@ import java.util.stream.Collectors;
/**
* An RPC implementation of the Network interface.
*
- * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ * @author havardpe
*/
public class RPCNetwork implements Network, MethodHandler {
@@ -81,7 +81,7 @@ public class RPCNetwork implements Network, MethodHandler {
* prefix is 'a/b' and the session name is 'c', the resulting service name that identifies the session on the
* message bus will be 'a/b/c'
*
- * @param params A complete set of parameters.
+ * @param params a complete set of parameters
* @param slobrokConfig subscriber for slobroks config
*/
public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) {
@@ -115,7 +115,7 @@ public class RPCNetwork implements Network, MethodHandler {
* prefix is 'a/b' and the session name is 'c', the resulting service name that identifies the session on the
* message bus will be 'a/b/c'
*
- * @param params A complete set of parameters.
+ * @param params a complete set of parameters
*/
public RPCNetwork(RPCNetworkParams params) {
this(params, params.getSlobroksConfig() != null ? new SlobrokConfigSubscriber(params.getSlobroksConfig())
@@ -249,7 +249,7 @@ public class RPCNetwork implements Network, MethodHandler {
* resolved. If all versions were resolved ahead of time, this method is invoked by the same thread as the former.
* If not, this method is invoked by the network thread during the version callback.
*
- * @param ctx All the required send-data.
+ * @param ctx all the required send-data
*/
private void send(SendContext ctx) {
if (destroyed.get()) {
@@ -267,7 +267,7 @@ public class RPCNetwork implements Network, MethodHandler {
* Sets the destroyed flag to true. The very first time this method is called, it cleans up all its dependencies.
* Even if you retain a reference to this object, all of its content is allowed to be garbage collected.
*
- * @return True if content existed and was destroyed.
+ * @return true if content existed and was destroyed
*/
public boolean destroy() {
if (!destroyed.getAndSet(true)) {
@@ -290,7 +290,7 @@ public class RPCNetwork implements Network, MethodHandler {
* network, and is separated into its own function so that unit tests can override it to simulate other versions
* than current.
*
- * @return The version to claim to be.
+ * @return the version to claim to be
*/
protected Version getVersion() {
return Vtag.currentVersion;
@@ -301,9 +301,9 @@ public class RPCNetwork implements Network, MethodHandler {
* {@link #allocServiceAddress(RoutingNode)} method. The target allocated here is released when the routing node
* calls {@link #freeServiceAddress(RoutingNode)}.
*
- * @param recipient The recipient to assign the service address to.
- * @param serviceName The name of the service to resolve.
- * @return Any error encountered, or null.
+ * @param recipient the recipient to assign the service address to
+ * @param serviceName the name of the service to resolve
+ * @return any error encountered, or null
*/
public Error resolveServiceAddress(RoutingNode recipient, String serviceName) {
RPCServiceAddress ret = servicePool.resolve(serviceName);
@@ -329,8 +329,8 @@ public class RPCNetwork implements Network, MethodHandler {
* Registers a send adapter for a given version. This will overwrite whatever is already registered under the same
* version.
*
- * @param version The version for which to register an adapter.
- * @param adapter The adapter to register.
+ * @param version the version for which to register an adapter
+ * @param adapter the adapter to register
*/
private void addSendAdapter(Version version, RPCSendAdapter adapter) {
adapter.attach(this);
@@ -341,8 +341,8 @@ public class RPCNetwork implements Network, MethodHandler {
* Determines and returns the send adapter that is compatible with the given version. If no adapter can be found,
* this method returns null.
*
- * @param version The version for which to return an adapter.
- * @return The compatible adapter.
+ * @param version the version for which to return an adapter
+ * @return the compatible adapter
*/
public RPCSendAdapter getSendAdapter(Version version) {
Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version);
@@ -352,9 +352,9 @@ public class RPCNetwork implements Network, MethodHandler {
/**
* Deliver an error reply to the recipients of a {@link SendContext} in a way that avoids entanglement.
*
- * @param ctx The send context that contains the recipient data.
- * @param errCode The error code to return.
- * @param errMsg The error string to return.
+ * @param ctx the send context that contains the recipient data
+ * @param errCode the error code to return
+ * @param errMsg the error string to return
*/
private void replyError(SendContext ctx, int errCode, String errMsg) {
for (RoutingNode recipient : ctx.recipients) {
@@ -365,38 +365,22 @@ public class RPCNetwork implements Network, MethodHandler {
}
}
- /**
- * Get the owner of this network
- *
- * @return network owner
- */
+ /** Returns the owner of this network. */
NetworkOwner getOwner() {
return owner;
}
- /**
- * Returns the identity of this network.
- *
- * @return The identity.
- */
+ /** Returns the identity of this network. */
public Identity getIdentity() {
return identity;
}
- /**
- * Obtain the port number this network listens to
- *
- * @return listening port number
- */
+ /** Returns the port number this network listens to. */
public int getPort() {
return listener.port();
}
- /**
- * Returns the JRT supervisor.
- *
- * @return The supervisor.
- */
+ /** Returns the JRT supervisor. */
Supervisor getSupervisor() {
return orb;
}
@@ -492,8 +476,8 @@ public class RPCNetwork implements Network, MethodHandler {
}
/**
- * Implements a helper class to invoke {@link RPCTargetPool#flushTargets(boolean)} once every second. This is to
- * unentangle the target pool from the scheduler.
+ * Implements a helper class to invoke {@link RPCTargetPool#flushTargets(boolean)} once every second.
+ * This is to untangle the target pool from the scheduler.
*/
private static class TargetPoolTask implements Runnable {
@@ -512,4 +496,5 @@ public class RPCNetwork implements Network, MethodHandler {
jrtTask.schedule(1.0);
}
}
+
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
index aec38e7d157..c4bcaa488e2 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
@@ -164,7 +164,7 @@ public class LocalNetworkTest {
final BlockingDeque<Message> messages = new LinkedBlockingDeque<>();
final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>();
- Server(final LocalWire wire) {
+ Server(LocalWire wire) {
mbus = new MessageBus(new LocalNetwork(wire),
new MessageBusParams().addProtocol(new SimpleProtocol())
.setRetryPolicy(null));
@@ -193,12 +193,12 @@ public class LocalNetworkTest {
}
@Override
- public void handleMessage(final Message msg) {
+ public void handleMessage(Message msg) {
messages.addLast(msg);
}
@Override
- public void handleReply(final Reply reply) {
+ public void handleReply(Reply reply) {
replies.addLast(reply);
}
}