diff options
Diffstat (limited to 'messagebus/src/test')
-rw-r--r-- | messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java | 90 |
1 files changed, 74 insertions, 16 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java index 296e724a502..9db0bda04af 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java @@ -1,19 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus.network.local; -import com.yahoo.messagebus.DestinationSession; -import com.yahoo.messagebus.DestinationSessionParams; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.IntermediateSession; -import com.yahoo.messagebus.IntermediateSessionParams; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.MessageHandler; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.SourceSession; -import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.*; import com.yahoo.messagebus.routing.Hop; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.test.SimpleMessage; @@ -27,7 +15,9 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> @@ -91,6 +81,68 @@ public class LocalNetworkTest { server.mbus.destroy(); } + @Test + public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException { + final LocalWire wire = new LocalWire(); + + final Server serverA = new Server(wire); + final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1)); + + final Server serverB = new Server(wire); + final IntermediateSession intermediate = serverB.newIntermediateSession(); + + final Server serverC = new Server(wire); + final DestinationSession destination = serverC.newDestinationSession(); + + Message msg = new SimpleMessage("foo"); + msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) + .addHop(Hop.parse(destination.getConnectionSpec()))); + assertThat(source.sendBlocking(msg).isAccepted(), is(true)); + Message msg2 = new SimpleMessage("foo2"); + msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) + .addHop(Hop.parse(destination.getConnectionSpec()))); + long TIMEOUT = 1000; + msg2.setTimeRemaining(TIMEOUT); + long start = System.currentTimeMillis(); + Result res = source.sendBlocking(msg2); + assertThat(res.isAccepted(), is(false)); + assertEquals(ErrorCode.TIMEOUT, res.getError().getCode()); + assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ")); + long end = System.currentTimeMillis(); + assertTrue(end - start > TIMEOUT); + assertTrue(end - start < 2*TIMEOUT); + + msg = serverB.messages.poll(60, TimeUnit.SECONDS); + assertThat(msg, instanceOf(SimpleMessage.class)); + assertThat(((SimpleMessage)msg).getValue(), is("foo")); + intermediate.forward(msg); + + msg = serverC.messages.poll(60, TimeUnit.SECONDS); + assertThat(msg, instanceOf(SimpleMessage.class)); + assertThat(((SimpleMessage)msg).getValue(), is("foo")); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + destination.reply(reply); + + reply = serverB.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(SimpleReply.class)); + assertThat(((SimpleReply)reply).getValue(), is("bar")); + intermediate.forward(reply); + + reply = serverA.replies.poll(60, TimeUnit.SECONDS); + assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode()); + assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ")); + + reply = serverA.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(SimpleReply.class)); + assertThat(((SimpleReply)reply).getValue(), is("bar")); + + serverA.mbus.destroy(); + serverB.mbus.destroy(); + serverC.mbus.destroy(); + + } + private static class Server implements MessageHandler, ReplyHandler { final MessageBus mbus; @@ -104,10 +156,16 @@ public class LocalNetworkTest { } SourceSession newSourceSession() { - return mbus.createSourceSession( - new SourceSessionParams().setTimeout(600.0).setReplyHandler(this)); + return mbus.createSourceSession(new SourceSessionParams() + .setTimeout(600.0) + .setReplyHandler(this)); + } + SourceSession newSourceSession(ThrottlePolicy throttlePolicy) { + return mbus.createSourceSession(new SourceSessionParams() + .setTimeout(600.0) + .setReplyHandler(this) + .setThrottlePolicy(throttlePolicy)); } - IntermediateSession newIntermediateSession() { return mbus.createIntermediateSession(new IntermediateSessionParams() .setMessageHandler(this) |