summaryrefslogtreecommitdiffstats
path: root/fnet/src/tests/connect/connect_test.cpp
blob: 5e48390a2974f8171cdc8671a0d6f9df67f3def3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/fnet/fnet.h>
#include <vespa/vespalib/net/server_socket.h>
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/stringfmt.h>

using namespace vespalib;

struct BlockingHostResolver : public AsyncResolver::HostResolver {
    AsyncResolver::SimpleHostResolver resolver;
    Gate caller;
    Gate barrier;
    BlockingHostResolver() : resolver(), caller(), barrier() {}
    vespalib::string ip_address(const vespalib::string &host) override {
        fprintf(stderr, "blocking resolve request: '%s'\n", host.c_str());
        caller.countDown();
        barrier.await();
        vespalib::string result = resolver.ip_address(host);
        fprintf(stderr, "returning resolve result: '%s'\n", result.c_str());
        return result;
    }
    void wait_for_caller() { caller.await(); }
    void release_caller() { barrier.countDown(); }
};

AsyncResolver::SP make_resolver(AsyncResolver::HostResolver::SP host_resolver) {
    AsyncResolver::Params params;
    params.resolver = host_resolver;
    return AsyncResolver::create(params);
}

//-----------------------------------------------------------------------------

struct TransportFixture : FNET_IPacketHandler, FNET_IConnectionCleanupHandler {
    FNET_SimplePacketStreamer streamer;
    FastOS_ThreadPool pool;
    FNET_Transport transport;
    Gate conn_lost;
    Gate conn_deleted;
    TransportFixture() : streamer(nullptr), pool(128 * 1024), transport(),
                         conn_lost(), conn_deleted()
    {
        transport.Start(&pool);
    }
    TransportFixture(AsyncResolver::HostResolver::SP host_resolver)
        : streamer(nullptr), pool(128 * 1024), transport(make_resolver(std::move(host_resolver)), 1),
          conn_lost(), conn_deleted()
    {
        transport.Start(&pool);
    }
    HP_RetCode HandlePacket(FNET_Packet *packet, FNET_Context) override {
        ASSERT_TRUE(packet->GetCommand() == FNET_ControlPacket::FNET_CMD_CHANNEL_LOST);
        conn_lost.countDown();
        packet->Free();
        return FNET_FREE_CHANNEL;
    }
    void Cleanup(FNET_Connection *) override { conn_deleted.countDown(); }
    FNET_Connection *connect(const vespalib::string &spec) {
        FNET_Connection *conn = transport.Connect(spec.c_str(), &streamer, this);
        ASSERT_TRUE(conn != nullptr);
        conn->SetCleanupHandler(this);
        return conn;
    }
    ~TransportFixture() {
        transport.ShutDown(true);
        pool.Close();
    }
};

//-----------------------------------------------------------------------------

TEST_MT_FFF("require that normal connect works", 2,
            ServerSocket("tcp/0"), TransportFixture(), TimeBomb(60))
{
    if (thread_id == 0) {
        SocketHandle socket = f1.accept();
        EXPECT_TRUE(socket.valid());
        TEST_BARRIER();
    } else {
        vespalib::string spec = make_string("tcp/localhost:%d", f1.address().port());
        FNET_Connection *conn = f2.connect(spec);
        TEST_BARRIER();
        conn->Owner()->Close(conn);
        EXPECT_TRUE(f2.conn_lost.await(60000));
        EXPECT_TRUE(!f2.conn_deleted.await(20));
        conn->SubRef();
        EXPECT_TRUE(f2.conn_deleted.await(60000));
    }
}

TEST_FF("require that bogus connect fail asynchronously", TransportFixture(), TimeBomb(60)) {
    FNET_Connection *conn = f1.connect("invalid");
    EXPECT_TRUE(f1.conn_lost.await(60000));
    EXPECT_TRUE(!f1.conn_deleted.await(20));
    conn->SubRef();
    EXPECT_TRUE(f1.conn_deleted.await(60000));
}

TEST_MT_FFFF("require that async close can be called before async resolve completes", 2,
             ServerSocket("tcp/0"), std::shared_ptr<BlockingHostResolver>(new BlockingHostResolver()),
             TransportFixture(f2), TimeBomb(60))
{
    if (thread_id == 0) {
        SocketHandle socket = f1.accept();
        EXPECT_TRUE(!socket.valid());
    } else {
        vespalib::string spec = make_string("tcp/localhost:%d", f1.address().port());
        FNET_Connection *conn = f3.connect(spec);
        f2->wait_for_caller();
        conn->Owner()->Close(conn);
        EXPECT_TRUE(f3.conn_lost.await(60000));
        f2->release_caller();
        EXPECT_TRUE(!f3.conn_deleted.await(20));
        conn->SubRef();
        EXPECT_TRUE(f3.conn_deleted.await(60000));
        f1.shutdown();
    }
}

TEST_MAIN() { TEST_RUN_ALL(); }