1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpctargetpool.h"
#include <vespa/messagebus/steadytimer.h>
namespace mbus {
RPCTargetPool::Entry::Entry(RPCTarget::SP target, uint64_t lastUse) :
_target(target),
_lastUse(lastUse)
{ }
RPCTargetPool::RPCTargetPool(double expireSecs) :
_lock(),
_targets(),
_timer(new SteadyTimer()),
_expireMillis(static_cast<uint64_t>(expireSecs * 1000))
{ }
RPCTargetPool::RPCTargetPool(ITimer::UP timer, double expireSecs) :
_lock(),
_targets(),
_timer(std::move(timer)),
_expireMillis(static_cast<uint64_t>(expireSecs * 1000))
{ }
RPCTargetPool::~RPCTargetPool()
{
flushTargets(true);
}
void
RPCTargetPool::flushTargets(bool force)
{
uint64_t currentTime = _timer->getMilliTime();
vespalib::LockGuard guard(_lock);
TargetMap::iterator it = _targets.begin();
while (it != _targets.end()) {
Entry &entry = it->second;
if (entry._target.get() != nullptr) {
if (entry._target.use_count() > 1) {
entry._lastUse = currentTime;
++it;
continue; // someone is using this
}
if (!force) {
if (entry._lastUse + _expireMillis > currentTime) {
++it;
continue; // not sufficiently idle
}
}
}
_targets.erase(it++); // postfix increment to move the iterator
}
}
size_t
RPCTargetPool::size()
{
vespalib::LockGuard guard(_lock);
return _targets.size();
}
RPCTarget::SP
RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address)
{
vespalib::LockGuard guard(_lock);
string spec = address.getConnectionSpec();
TargetMap::iterator it = _targets.find(spec);
if (it != _targets.end()) {
Entry &entry = it->second;
if (entry._target->isValid()) {
entry._lastUse = _timer->getMilliTime();
return entry._target;
}
_targets.erase(it);
}
auto ret = std::make_shared<RPCTarget>(spec, orb);
_targets.insert(TargetMap::value_type(spec, Entry(ret, _timer->getMilliTime())));
return ret;
}
} // namespace mbus
|