summaryrefslogtreecommitdiffstats
path: root/messagebus/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/test')
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java90
1 files changed, 74 insertions, 16 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
index 296e724a502..9db0bda04af 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
@@ -1,19 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.local;
-import com.yahoo.messagebus.DestinationSession;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.IntermediateSession;
-import com.yahoo.messagebus.IntermediateSessionParams;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.SourceSession;
-import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.*;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -27,7 +15,9 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
@@ -91,6 +81,68 @@ public class LocalNetworkTest {
server.mbus.destroy();
}
+ @Test
+ public void requireThatBlockingSendTimeOutInSendQ() throws InterruptedException {
+ final LocalWire wire = new LocalWire();
+
+ final Server serverA = new Server(wire);
+ final SourceSession source = serverA.newSourceSession(new StaticThrottlePolicy().setMaxPendingCount(1));
+
+ final Server serverB = new Server(wire);
+ final IntermediateSession intermediate = serverB.newIntermediateSession();
+
+ final Server serverC = new Server(wire);
+ final DestinationSession destination = serverC.newDestinationSession();
+
+ Message msg = new SimpleMessage("foo");
+ msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ assertThat(source.sendBlocking(msg).isAccepted(), is(true));
+ Message msg2 = new SimpleMessage("foo2");
+ msg2.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ long TIMEOUT = 1000;
+ msg2.setTimeRemaining(TIMEOUT);
+ long start = System.currentTimeMillis();
+ Result res = source.sendBlocking(msg2);
+ assertThat(res.isAccepted(), is(false));
+ assertEquals(ErrorCode.TIMEOUT, res.getError().getCode());
+ assertTrue(res.getError().getMessage().endsWith("Timed out in sendQ"));
+ long end = System.currentTimeMillis();
+ assertTrue(end - start > TIMEOUT);
+ assertTrue(end - start < 2*TIMEOUT);
+
+ msg = serverB.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ intermediate.forward(msg);
+
+ msg = serverC.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ destination.reply(reply);
+
+ reply = serverB.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+ intermediate.forward(reply);
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertEquals(ErrorCode.TIMEOUT, reply.getError(0).getCode());
+ assertTrue(reply.getError(0).getMessage().endsWith("Timed out in sendQ"));
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+
+ serverA.mbus.destroy();
+ serverB.mbus.destroy();
+ serverC.mbus.destroy();
+
+ }
+
private static class Server implements MessageHandler, ReplyHandler {
final MessageBus mbus;
@@ -104,10 +156,16 @@ public class LocalNetworkTest {
}
SourceSession newSourceSession() {
- return mbus.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setReplyHandler(this));
+ return mbus.createSourceSession(new SourceSessionParams()
+ .setTimeout(600.0)
+ .setReplyHandler(this));
+ }
+ SourceSession newSourceSession(ThrottlePolicy throttlePolicy) {
+ return mbus.createSourceSession(new SourceSessionParams()
+ .setTimeout(600.0)
+ .setReplyHandler(this)
+ .setThrottlePolicy(throttlePolicy));
}
-
IntermediateSession newIntermediateSession() {
return mbus.createIntermediateSession(new IntermediateSessionParams()
.setMessageHandler(this)