aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java
blob: 6612421e437d52e4bee5dfbfc5c898db8c2b0244 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.rpc;

import com.yahoo.concurrent.SystemTimer;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.*;
import com.yahoo.messagebus.network.rpc.test.TestServer;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTableSpec;
import com.yahoo.messagebus.test.Receptor;
import com.yahoo.messagebus.test.SimpleMessage;
import com.yahoo.messagebus.test.SimpleProtocol;
import com.yahoo.messagebus.test.SimpleReply;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

/**
 * @author havardpe
 */
public class BasicNetworkTestCase {

    Slobrok     slobrok;
    TestServer src;
    TestServer  pxy;
    TestServer  dst;

    @BeforeEach
    public void setUp() throws ListenFailedException {
        RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
        table.addHop("pxy", "test/pxy/session", List.of("test/pxy/session"));
        table.addHop("dst", "test/dst/session", List.of("test/dst/session"));
        table.addRoute("test", Arrays.asList("pxy", "dst"));
        slobrok = new Slobrok();
        src = new TestServer("test/src", table, slobrok, null);
        pxy = new TestServer("test/pxy", table, slobrok, null);
        dst = new TestServer("test/dst", table, slobrok, null);
    }

    @AfterEach
    public void tearDown() {
        dst.destroy();
        pxy.destroy();
        src.destroy();
        slobrok.stop();
    }

    @Test
    void testNetwork() {
        // set up receptors
        Receptor src_rr = new Receptor();
        Receptor pxy_mr = new Receptor();
        Receptor pxy_rr = new Receptor();
        Receptor dst_mr = new Receptor();

        // set up sessions
        SourceSessionParams sp = new SourceSessionParams();
        sp.setTimeout(30.0);

        SourceSession       ss = src.mb.createSourceSession(src_rr, sp);
        IntermediateSession is = pxy.mb.createIntermediateSession("session", true, pxy_mr, pxy_rr);
        DestinationSession  ds = dst.mb.createDestinationSession("session", true, dst_mr);

        // wait for slobrok registration
        assertTrue(src.waitSlobrok("test/pxy/session", 1));
        assertTrue(src.waitSlobrok("test/dst/session", 1));
        assertTrue(pxy.waitSlobrok("test/dst/session", 1));

        // send message on client
        ss.send(new SimpleMessage("test message"), "test");

        // check message on proxy
        Message msg = pxy_mr.getMessage(60);
        assertNotNull(msg);
        assertEquals(SimpleProtocol.MESSAGE, msg.getType());
        SimpleMessage sm = (SimpleMessage) msg;
        assertEquals("test message", sm.getValue());

        // forward message on proxy
        sm.setValue(sm.getValue() + " pxy");
        is.forward(sm);

        // check message on server
        msg = dst_mr.getMessage(60);
        assertNotNull(msg);
        assertEquals(SimpleProtocol.MESSAGE, msg.getType());
        sm = (SimpleMessage) msg;
        assertEquals("test message pxy", sm.getValue());

        // send reply on server
        SimpleReply sr = new SimpleReply("test reply");
        sm.swapState(sr);
        ds.reply(sr);

        // check reply on proxy
        Reply reply = pxy_rr.getReply(60);
        assertNotNull(reply);
        assertEquals(SimpleProtocol.REPLY, reply.getType());
        sr = (SimpleReply) reply;
        assertEquals("test reply", sr.getValue());

        // forward reply on proxy
        sr.setValue(sr.getValue() + " pxy");
        is.forward(sr);

        // check reply on client
        reply = src_rr.getReply(60);
        assertNotNull(reply);
        assertEquals(SimpleProtocol.REPLY, reply.getType());
        sr = (SimpleReply) reply;
        assertEquals("test reply pxy", sr.getValue());

        ss.destroy();
        is.destroy();
        ds.destroy();
    }

    @Test
    void testTimeoutsFollowMessage() {
        SourceSessionParams params = new SourceSessionParams().setTimeout(600.0);
        SourceSession ss = src.mb.createSourceSession(new Receptor(), params);
        DestinationSession ds = dst.mb.createDestinationSession("session", true, new Receptor());
        assertTrue(src.waitSlobrok("test/dst/session", 1));

        // Test default timeouts being set.
        Message msg = new SimpleMessage("msg");
        msg.getTrace().setLevel(9);
        long now = SystemTimer.INSTANCE.milliTime();
        assertTrue(ss.send(msg, Route.parse("dst")).isAccepted());

        assertNotNull(msg = ((Receptor) ds.getMessageHandler()).getMessage(60));
        assertTrue(msg.getTimeReceived() >= now);
        assertTrue(params.getTimeout() * 1000 >= msg.getTimeRemaining());
        ds.acknowledge(msg);

        assertNotNull(((Receptor) ss.getReplyHandler()).getReply(60));

        // Test default timeouts being overwritten.
        msg = new SimpleMessage("msg");
        msg.getTrace().setLevel(9);
        msg.setTimeRemaining(2 * (long) (params.getTimeout() * 1000));
        assertTrue(ss.send(msg, Route.parse("dst")).isAccepted());

        assertNotNull(msg = ((Receptor) ds.getMessageHandler()).getMessage(60));
        assertTrue(params.getTimeout() * 1000 < msg.getTimeRemaining());
        ds.acknowledge(msg);

        assertNotNull(((Receptor) ss.getReplyHandler()).getReply(60));

        ss.destroy();
        ds.destroy();
    }

}