aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/tests/shutdown/shutdown.cpp
blob: 8dc4aa555b4256d7ff80d25ef8823ed6808f6d95 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/exceptions.h>

using namespace mbus;

static const duration TIMEOUT = 120s;

TEST("requireThatListenFailedIsExceptionSafe")
{
    fnet::frt::StandaloneFRT orb;
    ASSERT_TRUE(orb.supervisor().Listen(0));

    Slobrok slobrok;
    try {
        TestServer bar(MessageBusParams(),
                       RPCNetworkParams(slobrok.config())
                       .setListenPort(orb.supervisor().GetListenPort()));
        EXPECT_TRUE(false);
    } catch (vespalib::Exception &e) {
        EXPECT_EQUAL("Failed to start network.", e.getMessage());
    }
}

TEST("requireThatShutdownOnSourceWithPendingIsSafe")
{
    Slobrok slobrok;
    TestServer dstServer(MessageBusParams()
                         .addProtocol(std::make_shared<SimpleProtocol>()),
                         RPCNetworkParams(slobrok.config())
                         .setIdentity(Identity("dst")));
    Receptor dstHandler;
    DestinationSession::UP dstSession = dstServer.mb.createDestinationSession(
            DestinationSessionParams()
            .setName("session")
            .setMessageHandler(dstHandler));
    ASSERT_TRUE(dstSession);

    for (uint32_t i = 0; i < 10; ++i) {
        Message::UP msg(new SimpleMessage("msg"));
        {
            TestServer srcServer(MessageBusParams()
                    .setRetryPolicy(std::make_shared<RetryTransientErrorsPolicy>())
                    .addProtocol(std::make_shared<SimpleProtocol>()),
                    RPCNetworkParams(slobrok.config()));
            Receptor srcHandler;
            SourceSession::UP srcSession = srcServer.mb.createSourceSession(SourceSessionParams()
                    .setThrottlePolicy(IThrottlePolicy::SP())
                    .setReplyHandler(srcHandler));
            ASSERT_TRUE(srcSession);
            ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));
            ASSERT_TRUE(srcSession->send(std::move(msg), "dst/session", true).isAccepted());
            msg = dstHandler.getMessage(TIMEOUT);
            ASSERT_TRUE(msg);
        }
        dstSession->acknowledge(std::move(msg));
    }
}

TEST("requireThatShutdownOnIntermediateWithPendingIsSafe")
{
    Slobrok slobrok;
    TestServer dstServer(MessageBusParams()
                         .addProtocol(std::make_shared<SimpleProtocol>()),
                         RPCNetworkParams(slobrok.config())
                         .setIdentity(Identity("dst")));
    Receptor dstHandler;
    DestinationSession::UP dstSession = dstServer.mb.createDestinationSession(
            DestinationSessionParams()
            .setName("session")
            .setMessageHandler(dstHandler));
    ASSERT_TRUE(dstSession);

    TestServer srcServer(MessageBusParams()
                         .setRetryPolicy(IRetryPolicy::SP())
                         .addProtocol(std::make_shared<SimpleProtocol>()),
                         RPCNetworkParams(slobrok.config()));
    Receptor srcHandler;
    SourceSession::UP srcSession = srcServer.mb.createSourceSession(SourceSessionParams()
            .setThrottlePolicy(IThrottlePolicy::SP())
            .setReplyHandler(srcHandler));
    ASSERT_TRUE(srcSession);
    ASSERT_TRUE(srcServer.waitSlobrok("dst/session", 1));

    for (uint32_t i = 0; i < 10; ++i) {
        Message::UP msg = std::make_unique<SimpleMessage>("msg");
        {
            TestServer itrServer(MessageBusParams()
                    .setRetryPolicy(std::make_shared<RetryTransientErrorsPolicy>())
                    .addProtocol(std::make_shared<SimpleProtocol>()),
                    RPCNetworkParams(slobrok.config())
                    .setIdentity(Identity("itr")));
            Receptor itrHandler;
            IntermediateSession::UP itrSession = itrServer.mb.createIntermediateSession(
                    IntermediateSessionParams()
                    .setName("session")
                    .setMessageHandler(itrHandler)
                    .setReplyHandler(itrHandler));
            ASSERT_TRUE(itrSession);
            ASSERT_TRUE(srcServer.waitSlobrok("itr/session", 1));
            ASSERT_TRUE(srcSession->send(std::move(msg), "itr/session dst/session", true).isAccepted());
            msg = itrHandler.getMessage(TIMEOUT);
            ASSERT_TRUE(msg);
            itrSession->forward(std::move(msg));
            msg = dstHandler.getMessage(TIMEOUT);
            ASSERT_TRUE(msg);
        }
        ASSERT_TRUE(srcServer.waitSlobrok("itr/session", 0));
        dstSession->acknowledge(std::move(msg));
        dstServer.mb.sync();
    }
}

TEST_MAIN() { TEST_RUN_ALL(); }