1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/sourcesession.h>
#include <vespa/messagebus/sourcesessionparams.h>
#include <vespa/messagebus/destinationsession.h>
#include <vespa/messagebus/network/identity.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
using namespace mbus;
using namespace std::chrono_literals;
class Test : public vespalib::TestApp {
public:
int Main() override;
void testZeroTimeout();
void testMessageExpires();
};
TEST_APPHOOK(Test);
int
Test::Main()
{
TEST_INIT("timeout_test");
testZeroTimeout(); TEST_FLUSH();
testMessageExpires(); TEST_FLUSH();
TEST_DONE();
}
void
Test::testZeroTimeout()
{
Slobrok slobrok;
TestServer srcServer(Identity("src"), RoutingSpec(), slobrok);
TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok);
Receptor srcHandler;
SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(0s));
Receptor dstHandler;
DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler);
ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted());
Reply::UP reply = srcHandler.getReply();
ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode());
}
void
Test::testMessageExpires()
{
Slobrok slobrok;
TestServer srcServer(Identity("src"), RoutingSpec(), slobrok);
TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok);
Receptor srcHandler, dstHandler;
SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(1s));
DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler);
ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted());
Reply::UP reply = srcHandler.getReply();
ASSERT_TRUE(reply);
EXPECT_EQUAL(1u, reply->getNumErrors());
EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode());
Message::UP msg = dstHandler.getMessage(1s);
if (msg) {
msg->discard();
}
}
|