aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/tests/timeout/timeout.cpp
blob: 2ffdab11c40e999c10515bc6943484b086648180 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/sourcesession.h>
#include <vespa/messagebus/sourcesessionparams.h>
#include <vespa/messagebus/destinationsession.h>
#include <vespa/messagebus/network/identity.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>

using namespace mbus;
using namespace std::chrono_literals;


class Test : public vespalib::TestApp {
public:
    int Main() override;
    void testZeroTimeout();
    void testMessageExpires();
};

TEST_APPHOOK(Test);

int
Test::Main()
{
    TEST_INIT("timeout_test");

    testZeroTimeout();    TEST_FLUSH();
    testMessageExpires(); TEST_FLUSH();

    TEST_DONE();
}

void
Test::testZeroTimeout()
{
    Slobrok slobrok;
    TestServer srcServer(Identity("src"), RoutingSpec(), slobrok);
    TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok);

    Receptor srcHandler;
    SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(0s));
    Receptor dstHandler;
    DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler);

    ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
    ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted());

    Reply::UP reply = srcHandler.getReply();
    ASSERT_TRUE(reply);
    EXPECT_EQUAL(1u, reply->getNumErrors());
    EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode());
}

void
Test::testMessageExpires()
{
    Slobrok slobrok;
    TestServer srcServer(Identity("src"), RoutingSpec(), slobrok);
    TestServer dstServer(Identity("dst"), RoutingSpec(), slobrok);

    Receptor srcHandler, dstHandler;
    SourceSession::UP srcSession = srcServer.mb.createSourceSession(srcHandler, SourceSessionParams().setTimeout(1s));
    DestinationSession::UP dstSession = dstServer.mb.createDestinationSession("session", true, dstHandler);

    ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
    ASSERT_TRUE(srcSession->send(Message::UP(new SimpleMessage("msg")), "dst/session", true).isAccepted());

    Reply::UP reply = srcHandler.getReply();
    ASSERT_TRUE(reply);
    EXPECT_EQUAL(1u, reply->getNumErrors());
    EXPECT_EQUAL((uint32_t)ErrorCode::TIMEOUT, reply->getError(0).getCode());

    Message::UP msg = dstHandler.getMessage(1s);
    if (msg) {
        msg->discard();
    }
}