summaryrefslogtreecommitdiffstats
path: root/messagebus/src/test
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 00:43:50 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2017-01-06 00:43:50 +0100
commit236832a9528552422ad77d84dadb381052db5de6 (patch)
tree819825cbed96744fed9709651d423d2f365ff956 /messagebus/src/test
parent3df8e56d3bead686aae9c362deb4b351415cca81 (diff)
- Added a test for sendBlocking with and without blocking sendQ.
- Redesigned based on @bratseth comments. - Now you will be timed out either by the reply thread in mbus or by the new thread that will handle sending of leftover messages. Timeout is not honoured correctly if there is a variation in timeout and system overload. A short timeout message might not be expired if an earlier message with alonger timeout is still on the Q. But I will let factory have a go at it before completing.
Diffstat (limited to 'messagebus/src/test')
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java90
1 files changed, 74 insertions, 16 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
index 296e724a502..9db0bda04af 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
@@ -1,19 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.local;
-import com.yahoo.messagebus.DestinationSession;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.IntermediateSession;
-import com.yahoo.messagebus.IntermediateSessionParams;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.SourceSession;
-import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.*;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -27,7 +15,9 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
@@ -91,6 +81,68 @@ public class LocalNetworkTest {
server.mbus.destroy();
}
+ @Test
+ public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException {
+ final LocalWire wire = new LocalWire();
+
+ final Server serverA = new Server(wire);
+ final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1));
+
+ final Server serverB = new Server(wire);
+ final IntermediateSession intermediate = serverB.newIntermediateSession();
+
+ final Server serverC = new Server(wire);
+ final DestinationSession destination = serverC.newDestinationSession();
+
+ Message msg = new SimpleMessage("foo");
+ msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ assertThat(source.sendBlocking(msg).isAccepted(), is(true));
+ Message msg2 = new SimpleMessage("foo2");
+ msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ long TIMEOUT = 1000;
+ msg2.setTimeRemaining(TIMEOUT);
+ long start = System.currentTimeMillis();
+ Result res = source.sendBlocking(msg2);
+ assertThat(res.isAccepted(), is(false));
+ assertEquals(ErrorCode.TIMEOUT, res.getError().getCode());
+ assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ"));
+ long end = System.currentTimeMillis();
+ assertTrue(end - start > TIMEOUT);
+ assertTrue(end - start < 2*TIMEOUT);
+
+ msg = serverB.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ intermediate.forward(msg);
+
+ msg = serverC.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ destination.reply(reply);
+
+ reply = serverB.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+ intermediate.forward(reply);
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode());
+ assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ"));
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+
+ serverA.mbus.destroy();
+ serverB.mbus.destroy();
+ serverC.mbus.destroy();
+
+ }
+
private static class Server implements MessageHandler, ReplyHandler {
final MessageBus mbus;
@@ -104,10 +156,16 @@ public class LocalNetworkTest {
}
SourceSession newSourceSession() {
- return mbus.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setReplyHandler(this));
+ return mbus.createSourceSession(new SourceSessionParams()
+ .setTimeout(600.0)
+ .setReplyHandler(this));
+ }
+ SourceSession newSourceSession(ThrottlePolicy throttlePolicy) {
+ return mbus.createSourceSession(new SourceSessionParams()
+ .setTimeout(600.0)
+ .setReplyHandler(this)
+ .setThrottlePolicy(throttlePolicy));
}
-
IntermediateSession newIntermediateSession() {
return mbus.createIntermediateSession(new IntermediateSessionParams()
.setMessageHandler(this)