aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus_test/src/tests/trace/trace.cpp
blob: a1b14281f9472189a5c5aebcc90db3f10787b2bf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/sourcesession.h>
#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/messagebus/routing/routingspec.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/sourcesessionparams.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <iostream>
#include <thread>

#include <vespa/log/log.h>
LOG_SETUP("trace_test");

using namespace mbus;
using vespalib::make_string;

TEST_SETUP(Test);

bool
waitSlobrok(RPCMessageBus &mbus, const std::string &pattern)
{
    for (int i = 0; i < 30000; i++) {
        slobrok::api::IMirrorAPI::SpecList res = mbus.getRPCNetwork().getMirror().lookup(pattern);
        if (res.size() > 0) {
            return true;
        }
        std::this_thread::sleep_for(10ms);
    }
    return false;
}

int
Test::Main()
{
    TEST_INIT("trace_test");
    Slobrok slobrok;
    const std::string routing_template = TEST_PATH("routing-template.cfg");
    const std::string ctl_script = TEST_PATH("ctl.sh");
    
    { // Make slobrok config
        EXPECT_TRUE(system("echo slobrok[1] > slobrok.cfg") == 0);
        EXPECT_TRUE(system(make_string("echo 'slobrok[0].connectionspec tcp/localhost:%d' "
                                      ">> slobrok.cfg", slobrok.port()).c_str()) == 0);
    }
    { // Make routing config
        EXPECT_TRUE(system(("cat " + routing_template + " > routing.cfg").c_str()) == 0);
    }
    
    EXPECT_TRUE(system((ctl_script + " start all").c_str()) == 0);
    RPCMessageBus mb(ProtocolSet().add(std::make_shared<SimpleProtocol>()),
                     RPCNetworkParams(config::ConfigUri("file:slobrok.cfg")),
                     config::ConfigUri("file:routing.cfg"));
    EXPECT_TRUE(waitSlobrok(mb, "server/cpp/1/A/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/cpp/2/A/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/cpp/2/B/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/A/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/B/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/C/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/cpp/3/D/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/java/1/A/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/java/2/A/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/java/2/B/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/java/3/A/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/java/3/B/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/java/3/C/session"));
    EXPECT_TRUE(waitSlobrok(mb, "server/java/3/D/session"));

    TraceNode e3 = TraceNode()
        .addChild(TraceNode().addChild("server/cpp/3/A (message)").addChild("server/cpp/3/A (reply)"))
        .addChild(TraceNode().addChild("server/cpp/3/B (message)").addChild("server/cpp/3/B (reply)"))
        .addChild(TraceNode().addChild("server/cpp/3/C (message)").addChild("server/cpp/3/C (reply)"))
        .addChild(TraceNode().addChild("server/cpp/3/D (message)").addChild("server/cpp/3/D (reply)"))
        .addChild(TraceNode().addChild("server/java/3/A (message)").addChild("server/java/3/A (reply)"))
        .addChild(TraceNode().addChild("server/java/3/B (message)").addChild("server/java/3/B (reply)"))
        .addChild(TraceNode().addChild("server/java/3/C (message)").addChild("server/java/3/C (reply)"))
        .addChild(TraceNode().addChild("server/java/3/D (message)").addChild("server/java/3/D (reply)")).setStrict(false);
    TraceNode e2 = TraceNode()
        .addChild(TraceNode().addChild("server/cpp/2/A (message)").addChild(e3).addChild("server/cpp/2/A (reply)"))
        .addChild(TraceNode().addChild("server/cpp/2/B (message)").addChild(e3).addChild("server/cpp/2/B (reply)"))
        .addChild(TraceNode().addChild("server/java/2/A (message)").addChild(e3).addChild("server/java/2/A (reply)"))
        .addChild(TraceNode().addChild("server/java/2/B (message)").addChild(e3).addChild("server/java/2/B (reply)")).setStrict(false);
    TraceNode expect = TraceNode()
        .addChild(TraceNode().addChild("server/cpp/1/A (message)").addChild(e2).addChild("server/cpp/1/A (reply)"))
        .addChild(TraceNode().addChild("server/java/1/A (message)").addChild(e2).addChild("server/java/1/A (reply)")).setStrict(false);
    expect.normalize();

    Receptor src;
    Reply::UP reply;
    SourceSession::UP ss = mb.getMessageBus().createSourceSession(src, SourceSessionParams());
    for (int i = 0; i < 50; ++i) {
        auto msg = std::make_unique<SimpleMessage>("test");
        msg->getTrace().setLevel(1);
        ss->send(std::move(msg), "test");
        reply = src.getReply(10s);
        if (reply) {
            reply->getTrace().normalize();
            // resending breaks the trace, so retry until it has expected form
            if (!reply->hasErrors() && reply->getTrace().encode() == expect.encode()) {
                break;
            }
        }
        std::cout << "Attempt " << i << " got errors, retrying in 1 second.." << std::endl;
        std::this_thread::sleep_for(1s);
    }

    EXPECT_TRUE(!reply->hasErrors());
    EXPECT_EQUAL(reply->getTrace().encode(), expect.encode());
    EXPECT_TRUE(system((ctl_script + " stop all").c_str()) == 0);
    TEST_DONE();
}