summaryrefslogtreecommitdiffstats
path: root/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
diff options
context:
space:
mode:
Diffstat (limited to 'jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java')
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java94
1 files changed, 94 insertions, 0 deletions
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
new file mode 100644
index 00000000000..e58c53ee91a
--- /dev/null
+++ b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
@@ -0,0 +1,94 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.jdisc.test.RemoteServer;
+import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SharedSourceSessionTestCase {
+
+ @Test
+ public void requireThatReplyHandlerCanNotBeSet() {
+ try {
+ newSourceSession(new SourceSessionParams().setReplyHandler(new ReplyQueue()));
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertEquals("Reply handler must be null.", e.getMessage());
+ }
+ }
+
+ @Test
+ public void requireThatSessionIsClosedOnDestroy() {
+ SharedSourceSession session = newSourceSession(new SourceSessionParams());
+ session.release();
+ assertFalse("SourceSession not destroyed by release().", session.session().destroy());
+ }
+
+ @Test
+ public void requireThatMbusIsReleasedOnDestroy() {
+ Slobrok slobrok = null;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ fail();
+ }
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams);
+ SharedSourceSession session = mbus.newSourceSession(new SourceSessionParams());
+ mbus.release();
+ session.release();
+ assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy());
+ }
+
+ @Test
+ public void requireThatSessionCanSendMessage() throws InterruptedException {
+ RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
+ SharedSourceSession session = newSourceSession(server.slobrokId(),
+ new SourceSessionParams());
+ ReplyQueue queue = new ReplyQueue();
+ Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
+ msg.pushHandler(queue);
+ assertTrue(session.sendMessage(msg).isAccepted());
+ assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS));
+ server.ackMessage(msg);
+ assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS));
+
+ session.release();
+ server.close();
+ }
+
+ private static SharedSourceSession newSourceSession(SourceSessionParams params) {
+ Slobrok slobrok = null;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ fail();
+ }
+ return newSourceSession(slobrok.configId(), params);
+ }
+
+ private static SharedSourceSession newSourceSession(String slobrokId, SourceSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ SharedSourceSession session = mbus.newSourceSession(params);
+ mbus.release();
+ return session;
+ }
+}