aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
blob: 4efb34af00d766372b22753feb8662b1df2c3874 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;

import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.*;
import com.yahoo.messagebus.jdisc.test.MessageQueue;
import com.yahoo.messagebus.jdisc.test.RemoteClient;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
import com.yahoo.messagebus.test.SimpleProtocol;
import com.yahoo.messagebus.test.SimpleReply;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

/**
 * @author Simon Thoresen Hult
 */
public class SharedDestinationSessionTestCase {

    @Test
    public void requireThatMessageHandlerCanBeAccessed() {
        SharedDestinationSession session = newDestinationSession();
        assertNull(session.getMessageHandler());

        MessageQueue handler = new MessageQueue();
        session.setMessageHandler(handler);
        assertSame(handler, session.getMessageHandler());
    }

    @Test
    public void requireThatMessageHandlerCanOnlyBeSetOnce() {
        SharedDestinationSession session = newDestinationSession();
        session.setMessageHandler(new MessageQueue());
        try {
            session.setMessageHandler(new MessageQueue());
            fail();
        } catch (IllegalStateException e) {
            assertEquals("Message handler already registered.", e.getMessage());
        }
        session.release();
    }

    @Test
    public void requireThatMessageHandlerIsCalled() throws InterruptedException {
        SharedDestinationSession session = newDestinationSession();
        MessageQueue queue = new MessageQueue();
        session.setMessageHandler(queue);
        session.handleMessage(new SimpleMessage("foo"));
        assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS));
        session.release();
    }

    @Test
    public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException {
        SharedDestinationSession session = newDestinationSession();
        Message msg = new SimpleMessage("foo");
        ReplyQueue queue = new ReplyQueue();
        msg.pushHandler(queue);
        session.handleMessage(msg);
        Reply reply = queue.awaitReply(60, TimeUnit.SECONDS);
        assertNotNull(reply);
        assertEquals(1, reply.getNumErrors());
        assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode());
        session.release();
    }

    @Test
    public void requireThatSessionIsClosedOnDestroy() {
        SharedDestinationSession session = newDestinationSession();
        session.release();
        assertFalse("DestinationSession not destroyed by release().", session.session().destroy());
    }

    @Test
    public void requireThatMbusIsReleasedOnDestroy() {
        Slobrok slobrok = null;
        try {
            slobrok = new Slobrok();
        } catch (ListenFailedException e) {
            fail();
        }
        RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId());
        SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams);
        SharedDestinationSession session = mbus.newDestinationSession(new DestinationSessionParams());
        mbus.release();
        session.release();
        assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy());
    }

    @Test
    public void requireThatSessionCanSendReply() throws InterruptedException {
        RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
        MessageQueue queue = new MessageQueue();
        DestinationSessionParams params = new DestinationSessionParams().setMessageHandler(queue);
        SharedDestinationSession session = newDestinationSession(client.slobroksConfig(), params);
        Route route = Route.parse(session.connectionSpec());

        assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
        Message msg = queue.awaitMessage(60, TimeUnit.SECONDS);
        assertNotNull(msg);
        Reply reply = new SimpleReply("bar");
        reply.swapState(msg);
        session.sendReply(reply);
        assertNotNull(client.awaitReply(60, TimeUnit.SECONDS));

        session.release();
        client.close();
    }

    private static SharedDestinationSession newDestinationSession() {
        Slobrok slobrok = null;
        try {
            slobrok = new Slobrok();
        } catch (ListenFailedException e) {
            fail();
        }
        return newDestinationSession(TestUtils.configFor(slobrok), new DestinationSessionParams());
    }

    private static SharedDestinationSession newDestinationSession(SlobroksConfig slobroksConfig, DestinationSessionParams params) {
        RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
        MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
        SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
        SharedDestinationSession session = mbus.newDestinationSession(params);
        mbus.release();
        return session;
    }
}