1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/network/rpctargetpool.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/log/log.h>
LOG_SETUP("targetpool_test");
using namespace mbus;
class PoolTimer : public ITimer {
public:
uint64_t millis;
PoolTimer() : millis(0) {
// empty
}
uint64_t getMilliTime() const override {
return millis;
}
};
TEST_SETUP(Test);
int
Test::Main()
{
TEST_INIT("targetpool_test");
// Necessary setup to be able to resolve targets.
Slobrok slobrok;
TestServer srv1(Identity("srv1"), RoutingSpec(), slobrok);
RPCServiceAddress adr1("", srv1.mb.getConnectionSpec());
TestServer srv2(Identity("srv2"), RoutingSpec(), slobrok);
RPCServiceAddress adr2("", srv2.mb.getConnectionSpec());
TestServer srv3(Identity("srv3"), RoutingSpec(), slobrok);
RPCServiceAddress adr3("", srv3.mb.getConnectionSpec());
FRT_Supervisor orb(1024u, 1);
ASSERT_TRUE(orb.Start());
std::unique_ptr<PoolTimer> ptr(new PoolTimer());
PoolTimer &timer = *ptr;
RPCTargetPool pool(std::move(ptr), 0.666);
// Assert that all connections expire.
RPCTarget::SP target;
ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset();
ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
EXPECT_EQUAL(3u, pool.size());
for (uint32_t i = 0; i < 10; ++i) {
pool.flushTargets(false);
EXPECT_EQUAL(3u, pool.size());
}
timer.millis += 999;
pool.flushTargets(false);
EXPECT_EQUAL(0u, pool.size());
// Assert that only idle connections expire.
ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL); target.reset();
ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
EXPECT_EQUAL(3u, pool.size());
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(3u, pool.size());
ASSERT_TRUE((target = pool.getTarget(orb, adr2)).get() != NULL); target.reset();
ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(2u, pool.size());
ASSERT_TRUE((target = pool.getTarget(orb, adr3)).get() != NULL); target.reset();
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(1u, pool.size());
timer.millis += 444;
pool.flushTargets(false);
EXPECT_EQUAL(0u, pool.size());
// Assert that connections never expire while they are referenced.
ASSERT_TRUE((target = pool.getTarget(orb, adr1)).get() != NULL);
EXPECT_EQUAL(1u, pool.size());
for (int i = 0; i < 10; ++i) {
timer.millis += 999;
pool.flushTargets(false);
EXPECT_EQUAL(1u, pool.size());
}
target.reset();
timer.millis += 999;
pool.flushTargets(false);
EXPECT_EQUAL(0u, pool.size());
orb.ShutDown(true);
TEST_DONE();
}
|