aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
blob: dd977efec085d534d9ee28a452b6ae0bd86727ee (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpctargetpool.h"
#include <vespa/messagebus/steadytimer.h>

namespace mbus {

RPCTargetPool::Entry::Entry(RPCTarget::SP target, uint64_t lastUse) :
    _target(target),
    _lastUse(lastUse)
{ }

RPCTargetPool::RPCTargetPool(double expireSecs) :
    _lock(),
    _targets(),
    _timer(new SteadyTimer()),
    _expireMillis(static_cast<uint64_t>(expireSecs * 1000))
{ }

RPCTargetPool::RPCTargetPool(ITimer::UP timer, double expireSecs) :
    _lock(),
    _targets(),
    _timer(std::move(timer)),
    _expireMillis(static_cast<uint64_t>(expireSecs * 1000))
{ }

RPCTargetPool::~RPCTargetPool()
{
    flushTargets(true);
}

void
RPCTargetPool::flushTargets(bool force)
{
    uint64_t currentTime = _timer->getMilliTime();
    vespalib::LockGuard guard(_lock);
    TargetMap::iterator it = _targets.begin();
    while (it != _targets.end()) {
        Entry &entry = it->second;
        if (entry._target.get() != nullptr) {
            if (entry._target.use_count() > 1) {
                entry._lastUse = currentTime;
                ++it;
                continue; // someone is using this
            }
            if (!force) {
                if (entry._lastUse + _expireMillis > currentTime) {
                    ++it;
                    continue; // not sufficiently idle
                }
            }
        }
        _targets.erase(it++); // postfix increment to move the iterator
    }
}

size_t
RPCTargetPool::size()
{
    vespalib::LockGuard guard(_lock);
    return _targets.size();
}

RPCTarget::SP
RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address)
{
    vespalib::LockGuard guard(_lock);
    string spec = address.getConnectionSpec();
    TargetMap::iterator it = _targets.find(spec);
    if (it != _targets.end()) {
        Entry &entry = it->second;
        if (entry._target->isValid()) {
            entry._lastUse = _timer->getMilliTime();
            return entry._target;
        }
        _targets.erase(it);
    }
    auto ret = std::make_shared<RPCTarget>(spec, orb);
    _targets.insert(TargetMap::value_type(spec, Entry(ret, _timer->getMilliTime())));
    return ret;
}

} // namespace mbus