aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
blob: db09b12711420361908bdb4bccc6061191107c56 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpctargetpool.h"
#include <vespa/messagebus/steadytimer.h>

namespace mbus {

RPCTargetPool::Entry::Entry(std::vector<RPCTarget::SP> targets, uint64_t lastUse)
    : _targets(std::move(targets)),
      _lastUse(lastUse),
      _next(0)
{ }

RPCTarget::SP
RPCTargetPool::Entry::getTarget(const LockGuard &, uint64_t now) {
    if (_next >= _targets.size()) {
        _next = 0;
    }
    RPCTarget::SP target =  _targets[_next++];
    if ( ! target->isValid()) {
        return RPCTarget::SP();
    }
    _lastUse = now;
    return target;
}

bool
RPCTargetPool::Entry::inUse(const LockGuard &) const {
    for (const auto & target : _targets) {
        if (target.use_count() > 1) {
            return true;
        }
    }
    return false;
}

RPCTargetPool::RPCTargetPool(double expireSecs, size_t numTargetsPerSpec)
    : RPCTargetPool(std::make_unique<SteadyTimer>(), expireSecs, numTargetsPerSpec)
{ }

RPCTargetPool::RPCTargetPool(ITimer::UP timer, double expireSecs, size_t numTargetsPerSpec) :
    _lock(),
    _targets(),
    _timer(std::move(timer)),
    _expireMillis(static_cast<uint64_t>(expireSecs * 1000)),
    _numTargetsPerSpec(numTargetsPerSpec)
{ }

RPCTargetPool::~RPCTargetPool()
{
    flushTargets(true);
}

void
RPCTargetPool::flushTargets(bool force)
{
    uint64_t currentTime = _timer->getMilliTime();
    // Erase RPC targets outside our lock to prevent the following mutex order inversion potential:
    //   flushTargets (pool lock) -> FNET transport thread post event (transport thread lock)
    //   FNET CheckTasks (transport thread lock) -> periodic flushTargets task run -> flushTargets (pool lock)
    std::vector<Entry> to_erase_on_scope_exit;
    LockGuard guard(_lock);
    {
        auto it = _targets.begin();
        while (it != _targets.end()) {
            const Entry& entry = it->second;
            if (!entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) {
                to_erase_on_scope_exit.emplace_back(std::move(it->second));
                it = _targets.erase(it);
            } else {
                ++it;
            }
        }
    }
}

size_t
RPCTargetPool::size()
{
    LockGuard guard(_lock);
    return _targets.size();
}

RPCTarget::SP
RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address)
{
    const string & spec = address.getConnectionSpec();
    uint64_t currentTime = _timer->getMilliTime();
    LockGuard guard(_lock);
    auto it = _targets.find(spec);
    if (it != _targets.end()) {
        RPCTarget::SP target = it->second.getTarget(guard, currentTime);
        if (target) {
            return target;
        }
        _targets.erase(it);
    }
    std::vector<RPCTarget::SP> targets;
    targets.reserve(_numTargetsPerSpec);
    for (size_t i(0); i < _numTargetsPerSpec; i++) {
        targets.push_back(RPCTarget::create(spec, orb));
    }
    _targets.insert(TargetMap::value_type(spec, Entry(std::move(targets), currentTime)));
    return _targets.find(spec)->second.getTarget(guard, currentTime);
}

} // namespace mbus