aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
blob: 86a6f37a64925a8368a8c4e24a548bc98c7d733b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.rpc;

import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.concurrent.Timer;
import com.yahoo.concurrent.ManualTimer;
import com.yahoo.messagebus.network.rpc.test.TestServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

import static org.junit.jupiter.api.Assertions.*;

/**
 * @author Simon Thoresen Hult
 */
public class TargetPoolTestCase {

    private Slobrok slobrok;
    private List<TestServer> servers;
    private Supervisor orb;

    @BeforeEach
    public void setUp() throws ListenFailedException {
        slobrok = new Slobrok();
        servers = new ArrayList<>();
        orb = new Supervisor(new Transport());
    }

    @AfterEach
    public void tearDown() {
        slobrok.stop();
        for (TestServer server : servers) {
            server.destroy();
        }
        orb.transport().shutdown().join();
    }

    @Test
    void testConnectionCycling()  {
        // Necessary setup to be able to resolve targets.
        RPCServiceAddress adr1 = registerServer();

        Timer timer = new ManualTimer();
        RPCTargetPool pool1 = new RPCTargetPool(timer, 0.666, 1);

        RPCTarget target1 = pool1.getTarget(orb, adr1);
        RPCTarget target2 = pool1.getTarget(orb, adr1);
        assertSame(target1, target2);
        target1.subRef();
        target2.subRef();

        RPCTargetPool pool3 = new RPCTargetPool(timer, 0.666, 3);

        target1 = pool3.getTarget(orb, adr1);
        target2 = pool3.getTarget(orb, adr1);
        RPCTarget target3 = pool3.getTarget(orb, adr1);
        assertNotSame(target1, target2);
        assertNotSame(target2, target3);
        assertNotSame(target3, target1);


        RPCTarget target4 = pool3.getTarget(orb, adr1);
        assertSame(target1, target4);
        target1.subRef();
        target2.subRef();
        target3.subRef();
        target4.subRef();
    }

    @Test
    void testConnectionExpire()  {
        // Necessary setup to be able to resolve targets.
        RPCServiceAddress adr1 = registerServer();
        RPCServiceAddress adr2 = registerServer();
        RPCServiceAddress adr3 = registerServer();

        ManualTimer timer = new ManualTimer();
        RPCTargetPool pool = new RPCTargetPool(timer, 0.666, 1);

        // Assert that all connections expire.
        RPCTarget target;
        assertNotNull(target = pool.getTarget(orb, adr1));
        target.subRef();
        assertNotNull(target = pool.getTarget(orb, adr2));
        target.subRef();
        assertNotNull(target = pool.getTarget(orb, adr3));
        target.subRef();
        assertEquals(3, pool.size());
        for (int i = 0; i < 10; ++i) {
            pool.flushTargets(false);
            assertEquals(3, pool.size());
        }
        timer.advance(999);
        pool.flushTargets(false);
        assertEquals(0, pool.size());

        // Assert that only idle connections expire.
        assertNotNull(target = pool.getTarget(orb, adr1));
        target.subRef();
        assertNotNull(target = pool.getTarget(orb, adr2));
        target.subRef();
        assertNotNull(target = pool.getTarget(orb, adr3));
        target.subRef();
        assertEquals(3, pool.size());
        timer.advance(444);
        pool.flushTargets(false);
        assertEquals(3, pool.size());
        assertNotNull(target = pool.getTarget(orb, adr2));
        target.subRef();
        assertNotNull(target = pool.getTarget(orb, adr3));
        target.subRef();
        assertEquals(3, pool.size());
        timer.advance(444);
        pool.flushTargets(false);
        assertEquals(2, pool.size());
        assertNotNull(target = pool.getTarget(orb, adr3));
        target.subRef();
        timer.advance(444);
        pool.flushTargets(false);
        assertEquals(1, pool.size());
        timer.advance(444);
        pool.flushTargets(false);
        assertEquals(0, pool.size());

        // Assert that connections never expire while they are referenced.
        assertNotNull(target = pool.getTarget(orb, adr1));
        assertEquals(1, pool.size());
        for (int i = 0; i < 10; ++i) {
            timer.advance(999);
            pool.flushTargets(false);
            assertEquals(1, pool.size());
        }
        target.subRef();
        timer.advance(999);
        pool.flushTargets(false);
        assertEquals(0, pool.size());
    }

    private RPCServiceAddress registerServer() {
        servers.add(new TestServer("srv" + servers.size(), null, slobrok, null));
        return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec());
    }

}