summaryrefslogtreecommitdiffstats
path: root/messagebus/src/tests/messageordering/messageordering.cpp
blob: df6ca38dd0d2b0d28687846d2430487a3bf0d508 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/sourcesession.h>
#include <vespa/messagebus/destinationsession.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/messagebus/routing/routingspec.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/sourcesessionparams.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/vespalib/util/vstringfmt.h>
#include <stdexcept>

#include <vespa/log/log.h>
LOG_SETUP("messageordering_test");

using namespace mbus;

TEST_SETUP(Test);

RoutingSpec
getRouting()
{
    return RoutingSpec()
        .addTable(RoutingTableSpec("Simple")
                  .addHop(HopSpec("dst", "test/dst/session"))
                  .addRoute(RouteSpec("test").addHop("dst")));
}

class MultiReceptor : public IMessageHandler
{
private:
    vespalib::Monitor _mon;
    DestinationSession* _destinationSession;
    int _messageCounter;

    MultiReceptor(const Receptor &);
    MultiReceptor &operator=(const Receptor &);
public:
    MultiReceptor()
        : _mon(),
          _destinationSession(0),
          _messageCounter(0)
    {}
    void handleMessage(Message::UP msg) override
     {
        SimpleMessage& simpleMsg(dynamic_cast<SimpleMessage&>(*msg));
        LOG(spam, "Attempting to acquire lock for %s",
            simpleMsg.getValue().c_str());

        vespalib::MonitorGuard lock(_mon);

        vespalib::string expected(vespalib::make_vespa_string("%d", _messageCounter));
        LOG(debug, "Got message %p with %s, expecting %s",
            msg.get(),
            simpleMsg.getValue().c_str(),
            expected.c_str());

        SimpleReply::UP sr(new SimpleReply("test reply"));
        msg->swapState(*sr);

        if (simpleMsg.getValue() != expected) {
            std::stringstream ss;
            ss << "Received out-of-sequence message! Expected "
               << expected
               << ", but got "
               << simpleMsg.getValue();
            //LOG(warning, "%s", ss.str().c_str());
            sr->addError(Error(ErrorCode::FATAL_ERROR, ss.str()));
        }
        sr->setValue(simpleMsg.getValue());

        ++_messageCounter;
        _destinationSession->reply(Reply::UP(sr.release()));
    }
    void setDestinationSession(DestinationSession& sess) {
        _destinationSession = &sess;
    }
};

class VerifyReplyReceptor : public IReplyHandler
{
    vespalib::Monitor _mon;
    std::string _failure;
    int _replyCount;
public:
    ~VerifyReplyReceptor();
    VerifyReplyReceptor();
    void handleReply(Reply::UP reply) override;
    void waitUntilDone(int waitForCount) const;
    const std::string& getFailure() const { return _failure; }
};

VerifyReplyReceptor::~VerifyReplyReceptor() {}
VerifyReplyReceptor::VerifyReplyReceptor()
    : _mon(),
      _failure(),
      _replyCount(0)
{}

void
VerifyReplyReceptor::handleReply(Reply::UP reply)
{
    vespalib::MonitorGuard lock(_mon);
    if (reply->hasErrors()) {
        std::ostringstream ss;
        ss << "Reply failed with "
           << reply->getError(0).getMessage()
           << "\n"
           << reply->getTrace().toString();
        if (_failure.empty()) {
            _failure = ss.str();
        }
        LOG(warning, "%s", ss.str().c_str());
    } else {
        vespalib::string expected(vespalib::make_vespa_string("%d", _replyCount));
        SimpleReply& simpleReply(static_cast<SimpleReply&>(*reply));
        if (simpleReply.getValue() != expected) {
            std::stringstream ss;
            ss << "Received out-of-sequence reply! Expected "
               << expected
               << ", but got "
               << simpleReply.getValue();
            LOG(warning, "%s", ss.str().c_str());
            if (_failure.empty()) {
                _failure = ss.str();
            }
        }
    }
    ++_replyCount;
    lock.broadcast();
}
void
VerifyReplyReceptor::waitUntilDone(int waitForCount) const
{
    vespalib::MonitorGuard lock(_mon);
    while (_replyCount < waitForCount) {
        lock.wait(1000);
    }
}

int
Test::Main()
{
    TEST_INIT("messageordering_test");

    Slobrok     slobrok;
    TestServer  srcNet(Identity("test/src"), getRouting(), slobrok);
    TestServer  dstNet(Identity("test/dst"), getRouting(), slobrok);

    VerifyReplyReceptor src;
    MultiReceptor dst;

    SourceSessionParams ssp;
    ssp.setThrottlePolicy(IThrottlePolicy::SP());
    ssp.setTimeout(400);
    SourceSession::UP      ss = srcNet.mb.createSourceSession(src, ssp);
    DestinationSession::UP ds = dstNet.mb.createDestinationSession("session", true, dst);
    ASSERT_EQUAL(400u, ssp.getTimeout());

    // wait for slobrok registration
    ASSERT_TRUE(srcNet.waitSlobrok("test/dst/session"));

    // same message id for all messages in order to guarantee ordering
    int commonMessageId = 42;

    // send messages on client
    const int messageCount = 10000;
    for (int i = 0; i < messageCount; ++i) {
        vespalib::string str(vespalib::make_vespa_string("%d", i));
        //FastOS_Thread::Sleep(1);
        SimpleMessage::UP msg(new SimpleMessage(str, true, commonMessageId));
        msg->getTrace().setLevel(9);
        //LOG(debug, "Sending message %p for %d", msg.get(), i);
        ASSERT_EQUAL(uint32_t(ErrorCode::NONE),
                     ss->send(std::move(msg), "test").getError().getCode());
    }
    src.waitUntilDone(messageCount);

    ASSERT_EQUAL(std::string(), src.getFailure());

    TEST_DONE();
}