diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 00:43:50 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-01-06 00:43:50 +0100 |
commit | 236832a9528552422ad77d84dadb381052db5de6 (patch) | |
tree | 819825cbed96744fed9709651d423d2f365ff956 /messagebus/src/test | |
parent | 3df8e56d3bead686aae9c362deb4b351415cca81 (diff) |
- Added a test for sendBlocking with and without blocking sendQ.
- Redesigned based on @bratseth comments.
- Now you will be timed out either by the reply thread in mbus or by the
new thread that will handle sending of leftover messages.
Timeout is not honoured correctly if there is a variation in timeout and system overload.
A short timeout message might not be expired if an earlier message with alonger timeout is still on the Q.
But I will let factory have a go at it before completing.
Diffstat (limited to 'messagebus/src/test')
-rw-r--r-- | messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java | 90 |
1 files changed, 74 insertions, 16 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java index 296e724a502..9db0bda04af 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java @@ -1,19 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus.network.local; -import com.yahoo.messagebus.DestinationSession; -import com.yahoo.messagebus.DestinationSessionParams; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.IntermediateSession; -import com.yahoo.messagebus.IntermediateSessionParams; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.MessageHandler; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.SourceSession; -import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.*; import com.yahoo.messagebus.routing.Hop; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.test.SimpleMessage; @@ -27,7 +15,9 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; /** * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> @@ -91,6 +81,68 @@ public class LocalNetworkTest { server.mbus.destroy(); } + @Test + public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException { + final LocalWire wire = new LocalWire(); + + final Server serverA = new Server(wire); + final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1)); + + final Server serverB = new Server(wire); + final IntermediateSession intermediate = serverB.newIntermediateSession(); + + final Server serverC = new Server(wire); + final DestinationSession destination = serverC.newDestinationSession(); + + Message msg = new SimpleMessage("foo"); + msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) + .addHop(Hop.parse(destination.getConnectionSpec()))); + assertThat(source.sendBlocking(msg).isAccepted(), is(true)); + Message msg2 = new SimpleMessage("foo2"); + msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) + .addHop(Hop.parse(destination.getConnectionSpec()))); + long TIMEOUT = 1000; + msg2.setTimeRemaining(TIMEOUT); + long start = System.currentTimeMillis(); + Result res = source.sendBlocking(msg2); + assertThat(res.isAccepted(), is(false)); + assertEquals(ErrorCode.TIMEOUT, res.getError().getCode()); + assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ")); + long end = System.currentTimeMillis(); + assertTrue(end - start > TIMEOUT); + assertTrue(end - start < 2*TIMEOUT); + + msg = serverB.messages.poll(60, TimeUnit.SECONDS); + assertThat(msg, instanceOf(SimpleMessage.class)); + assertThat(((SimpleMessage)msg).getValue(), is("foo")); + intermediate.forward(msg); + + msg = serverC.messages.poll(60, TimeUnit.SECONDS); + assertThat(msg, instanceOf(SimpleMessage.class)); + assertThat(((SimpleMessage)msg).getValue(), is("foo")); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + destination.reply(reply); + + reply = serverB.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(SimpleReply.class)); + assertThat(((SimpleReply)reply).getValue(), is("bar")); + intermediate.forward(reply); + + reply = serverA.replies.poll(60, TimeUnit.SECONDS); + assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode()); + assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ")); + + reply = serverA.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(SimpleReply.class)); + assertThat(((SimpleReply)reply).getValue(), is("bar")); + + serverA.mbus.destroy(); + serverB.mbus.destroy(); + serverC.mbus.destroy(); + + } + private static class Server implements MessageHandler, ReplyHandler { final MessageBus mbus; @@ -104,10 +156,16 @@ public class LocalNetworkTest { } SourceSession newSourceSession() { - return mbus.createSourceSession( - new SourceSessionParams().setTimeout(600.0).setReplyHandler(this)); + return mbus.createSourceSession(new SourceSessionParams() + .setTimeout(600.0) + .setReplyHandler(this)); + } + SourceSession newSourceSession(ThrottlePolicy throttlePolicy) { + return mbus.createSourceSession(new SourceSessionParams() + .setTimeout(600.0) + .setReplyHandler(this) + .setThrottlePolicy(throttlePolicy)); } - IntermediateSession newIntermediateSession() { return mbus.createIntermediateSession(new IntermediateSessionParams() .setMessageHandler(this) |