1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/exceptions.h>
using namespace mbus;
static const duration TIMEOUT = 120s;
TEST("requireThatListenFailedIsExceptionSafe")
{
fnet::frt::StandaloneFRT orb;
ASSERT_TRUE(orb.supervisor().Listen(0));
Slobrok slobrok;
try {
TestServer bar(MessageBusParams(),
RPCNetworkParams(slobrok.config())
.setListenPort(orb.supervisor().GetListenPort()));
EXPECT_TRUE(false);
} catch (vespalib::Exception &e) {
EXPECT_EQUAL("Failed to start network.", e.getMessage());
}
}
TEST("requireThatShutdownOnSourceWithPendingIsSafe")
{
Slobrok slobrok;
TestServer dstServer(MessageBusParams()
.addProtocol(std::make_shared<SimpleProtocol>()),
RPCNetworkParams(slobrok.config())
.setIdentity(Identity("dst")));
Receptor dstHandler;
DestinationSession::UP dstSession = dstServer.mb.createDestinationSession(
DestinationSessionParams()
.setName("session")
.setMessageHandler(dstHandler));
ASSERT_TRUE(dstSession);
for (uint32_t i = 0; i < 10; ++i) {
Message::UP msg(new SimpleMessage("msg"));
{
TestServer srcServer(MessageBusParams()
.setRetryPolicy(std::make_shared<RetryTransientErrorsPolicy>())
.addProtocol(std::make_shared<SimpleProtocol>()),
RPCNetworkParams(slobrok.config()));
Receptor srcHandler;
SourceSession::UP srcSession = srcServer.mb.createSourceSession(SourceSessionParams()
.setThrottlePolicy(IThrottlePolicy::SP())
.setReplyHandler(srcHandler));
ASSERT_TRUE(srcSession);
ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
ASSERT_TRUE(srcSession->send(std::move(msg), "dst/session", true).isAccepted());
msg = dstHandler.getMessage(TIMEOUT);
ASSERT_TRUE(msg);
}
dstSession->acknowledge(std::move(msg));
}
}
TEST("requireThatShutdownOnIntermediateWithPendingIsSafe")
{
Slobrok slobrok;
TestServer dstServer(MessageBusParams()
.addProtocol(std::make_shared<SimpleProtocol>()),
RPCNetworkParams(slobrok.config())
.setIdentity(Identity("dst")));
Receptor dstHandler;
DestinationSession::UP dstSession = dstServer.mb.createDestinationSession(
DestinationSessionParams()
.setName("session")
.setMessageHandler(dstHandler));
ASSERT_TRUE(dstSession);
TestServer srcServer(MessageBusParams()
.setRetryPolicy(IRetryPolicy::SP())
.addProtocol(std::make_shared<SimpleProtocol>()),
RPCNetworkParams(slobrok.config()));
Receptor srcHandler;
SourceSession::UP srcSession = srcServer.mb.createSourceSession(SourceSessionParams()
.setThrottlePolicy(IThrottlePolicy::SP())
.setReplyHandler(srcHandler));
ASSERT_TRUE(srcSession);
ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
for (uint32_t i = 0; i < 10; ++i) {
Message::UP msg = std::make_unique<SimpleMessage>("msg");
{
TestServer itrServer(MessageBusParams()
.setRetryPolicy(std::make_shared<RetryTransientErrorsPolicy>())
.addProtocol(std::make_shared<SimpleProtocol>()),
RPCNetworkParams(slobrok.config())
.setIdentity(Identity("itr")));
Receptor itrHandler;
IntermediateSession::UP itrSession = itrServer.mb.createIntermediateSession(
IntermediateSessionParams()
.setName("session")
.setMessageHandler(itrHandler)
.setReplyHandler(itrHandler));
ASSERT_TRUE(itrSession);
ASSERT_TRUE(srcServer.waitSlobrok("itr/session", 1));
ASSERT_TRUE(srcSession->send(std::move(msg), "itr/session dst/session", true).isAccepted());
msg = itrHandler.getMessage(TIMEOUT);
ASSERT_TRUE(msg);
itrSession->forward(std::move(msg));
msg = dstHandler.getMessage(TIMEOUT);
ASSERT_TRUE(msg);
}
ASSERT_TRUE(srcServer.waitSlobrok("itr/session", 0));
dstSession->acknowledge(std::move(msg));
dstServer.mb.sync();
}
}
TEST_MAIN() { TEST_RUN_ALL(); }
|