1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.network.rpc.test.TestServer;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.Receptor;
import com.yahoo.messagebus.test.SimpleMessage;
import com.yahoo.messagebus.test.SimpleReply;
import org.junit.jupiter.api.Test;
import java.net.UnknownHostException;
import static org.junit.jupiter.api.Assertions.*;
public class RoutableTestCase {
private final double delta = 0.00000001;
@Test
void testMessageContext() throws ListenFailedException {
Slobrok slobrok = new Slobrok();
TestServer srcServer = new TestServer("src", null, slobrok, null);
TestServer dstServer = new TestServer("dst", null, slobrok, null);
SourceSession srcSession = srcServer.mb.createSourceSession(
new Receptor(),
new SourceSessionParams().setTimeout(600.0));
DestinationSession dstSession = dstServer.mb.createDestinationSession("session", true, new Receptor());
assertTrue(srcServer.waitSlobrok("dst/session", 1));
Object context = new Object();
Message msg = new SimpleMessage("msg");
msg.setContext(context);
assertTrue(srcSession.send(msg, "dst/session", true).isAccepted());
assertNotNull(msg = ((Receptor) dstSession.getMessageHandler()).getMessage(60));
dstSession.acknowledge(msg);
Reply reply = ((Receptor) srcSession.getReplyHandler()).getReply(60);
assertNotNull(reply);
assertSame(reply.getContext(), context);
srcSession.destroy();
srcServer.destroy();
dstSession.destroy();
dstServer.destroy();
slobrok.stop();
}
@Test
void testMessageSwapState() {
Message foo = new SimpleMessage("foo");
Route fooRoute = Route.parse("foo");
foo.setRoute(fooRoute);
foo.setRetry(1);
foo.setTimeReceivedNow();
foo.setTimeRemaining(2);
Message bar = new SimpleMessage("bar");
Route barRoute = Route.parse("bar");
bar.setRoute(barRoute);
bar.setRetry(3);
bar.setTimeReceivedNow();
bar.setTimeRemaining(4);
foo.swapState(bar);
assertEquals(barRoute, foo.getRoute());
assertEquals(fooRoute, bar.getRoute());
assertEquals(3, foo.getRetry());
assertEquals(1, bar.getRetry());
assertTrue(foo.getTimeReceived() >= bar.getTimeReceived());
assertEquals(4, foo.getTimeRemaining());
assertEquals(2, bar.getTimeRemaining());
}
@Test
void testReplySwapState() {
Reply foo = new SimpleReply("foo");
Message fooMsg = new SimpleMessage("foo");
foo.setMessage(fooMsg);
foo.setRetryDelay(1);
foo.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatal"));
foo.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "transient"));
Reply bar = new SimpleReply("bar");
Message barMsg = new SimpleMessage("bar");
bar.setMessage(barMsg);
bar.setRetryDelay(2);
bar.addError(new Error(ErrorCode.ERROR_LIMIT, "err"));
foo.swapState(bar);
assertEquals(barMsg, foo.getMessage());
assertEquals(fooMsg, bar.getMessage());
assertEquals(2.0, foo.getRetryDelay(), delta);
assertEquals(1.0, bar.getRetryDelay(), delta);
assertEquals(1, foo.getNumErrors());
assertEquals(2, bar.getNumErrors());
}
@Test
void testMessageDiscard() {
Receptor handler = new Receptor();
Message msg = new SimpleMessage("foo");
msg.pushHandler(handler);
msg.discard();
assertNull(handler.getReply(0));
}
@Test
void testReplyDiscard() {
Receptor handler = new Receptor();
Message msg = new SimpleMessage("foo");
msg.pushHandler(handler);
Reply reply = new SimpleReply("bar");
reply.swapState(msg);
reply.discard();
assertNull(handler.getReply(0));
}
}
|