aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/tests/messageordering/messageordering.cpp
blob: 8b3754a8016af923ac95027f124c04732ea00122 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/vespalib/util/stringfmt.h>

#include <vespa/log/log.h>
LOG_SETUP("messageordering_test");

using namespace mbus;
using namespace std::chrono_literals;

TEST_SETUP(Test);

RoutingSpec
getRouting()
{
    return RoutingSpec()
        .addTable(RoutingTableSpec("Simple")
                  .addHop(HopSpec("dst", "test/dst/session"))
                  .addRoute(RouteSpec("test").addHop("dst")));
}

class MultiReceptor : public IMessageHandler
{
private:
    std::mutex _mon;
    DestinationSession* _destinationSession;
    int _messageCounter;

public:
    MultiReceptor()
        : _mon(),
          _destinationSession(nullptr),
          _messageCounter(0)
    {}
    void handleMessage(Message::UP msg) override
     {
        auto & simpleMsg(dynamic_cast<SimpleMessage&>(*msg));
        LOG(spam, "Attempting to acquire lock for %s",
            simpleMsg.getValue().c_str());

        std::lock_guard guard(_mon);

        vespalib::string expected(vespalib::make_string("%d", _messageCounter));
        LOG(debug, "Got message %p with %s, expecting %s",
            msg.get(),
            simpleMsg.getValue().c_str(),
            expected.c_str());

        auto sr =std::make_unique<SimpleReply>("test reply");
        msg->swapState(*sr);

        if (simpleMsg.getValue() != expected) {
            std::stringstream ss;
            ss << "Received out-of-sequence message! Expected "
               << expected
               << ", but got "
               << simpleMsg.getValue();
            //LOG(warning, "%s", ss.str().c_str());
            sr->addError(Error(ErrorCode::FATAL_ERROR, ss.str()));
        }
        sr->setValue(simpleMsg.getValue());

        ++_messageCounter;
        _destinationSession->reply(Reply::UP(sr.release()));
    }
    void setDestinationSession(DestinationSession& sess) {
        _destinationSession = &sess;
    }
};

class VerifyReplyReceptor : public IReplyHandler
{
    mutable std::mutex              _mon;
    mutable std::condition_variable _cond;
    std::string _failure;
    int _replyCount;
public:
    ~VerifyReplyReceptor() override;
    VerifyReplyReceptor();
    void handleReply(Reply::UP reply) override;
    void waitUntilDone(int waitForCount) const;
    const std::string& getFailure() const { return _failure; }
};

VerifyReplyReceptor::~VerifyReplyReceptor() = default;
VerifyReplyReceptor::VerifyReplyReceptor()
    : _mon(),
      _cond(),
      _failure(),
      _replyCount(0)
{}

void
VerifyReplyReceptor::handleReply(Reply::UP reply)
{
    std::lock_guard lock(_mon);
    if (reply->hasErrors()) {
        std::ostringstream ss;
        ss << "Reply failed with "
           << reply->getError(0).getMessage()
           << "\n"
           << reply->getTrace().toString();
        if (_failure.empty()) {
            _failure = ss.str();
        }
        LOG(warning, "%s", ss.str().c_str());
    } else {
        vespalib::string expected(vespalib::make_string("%d", _replyCount));
        auto & simpleReply(dynamic_cast<SimpleReply&>(*reply));
        if (simpleReply.getValue() != expected) {
            std::stringstream ss;
            ss << "Received out-of-sequence reply! Expected "
               << expected
               << ", but got "
               << simpleReply.getValue();
            LOG(warning, "%s", ss.str().c_str());
            if (_failure.empty()) {
                _failure = ss.str();
            }
        }
    }
    ++_replyCount;
    _cond.notify_all();
}
void
VerifyReplyReceptor::waitUntilDone(int waitForCount) const
{
    std::unique_lock guard(_mon);
    while (_replyCount < waitForCount) {
        _cond.wait_for(guard, 1s);
    }
}

int
Test::Main()
{
    TEST_INIT("messageordering_test");

    Slobrok     slobrok;
    TestServer  srcNet(Identity("test/src"), getRouting(), slobrok);
    TestServer  dstNet(Identity("test/dst"), getRouting(), slobrok);

    VerifyReplyReceptor src;
    MultiReceptor dst;

    SourceSessionParams ssp;
    ssp.setThrottlePolicy(IThrottlePolicy::SP());
    ssp.setTimeout(400s);
    SourceSession::UP      ss = srcNet.mb.createSourceSession(src, ssp);
    DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst);
    dst.setDestinationSession(*ds);
    ASSERT_EQUAL(400s, ssp.getTimeout());

    // wait for slobrok registration
    ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session"));

    // same message id for all messages in order to guarantee ordering
    int commonMessageId = 42;

    // send messages on client
    const int messageCount = 5000;
    for (int i = 0; i < messageCount; ++i) {
        vespalib::string str(vespalib::make_string("%d", i));
        //std::this_thread::sleep_for(1ms);
        auto msg = std::make_unique<SimpleMessage>(str, true, commonMessageId);
        msg->getTrace().setLevel(9);
        //LOG(debug, "Sending message %p for %d", msg.get(), i);
        ASSERT_EQUAL(uint32_t(ErrorCode::NONE),
                     ss->send(std::move(msg), "test").getError().getCode());
    }
    src.waitUntilDone(messageCount);

    ASSERT_EQUAL(std::string(), src.getFailure());

    TEST_DONE();
}