diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-07-02 10:00:42 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-07-02 10:00:42 +0000 |
commit | 6ee538b960508d23242dc55e7373e57f2f3e7b90 (patch) | |
tree | 6fbbcef3857057762c475cbb65e3f270b220158a /messagebus | |
parent | 683b06daeb3cdfc62ca245f0601a0de8307ddce1 (diff) |
Implement use of multiple rpc targets in mbus
Diffstat (limited to 'messagebus')
4 files changed, 59 insertions, 41 deletions
diff --git a/messagebus/src/tests/targetpool/targetpool.cpp b/messagebus/src/tests/targetpool/targetpool.cpp index 9259f992d6c..9ff352abcf5 100644 --- a/messagebus/src/tests/targetpool/targetpool.cpp +++ b/messagebus/src/tests/targetpool/targetpool.cpp @@ -37,7 +37,7 @@ TEST("targetpool_test") { FRT_Supervisor & orb = server.supervisor(); std::unique_ptr<PoolTimer> ptr(new PoolTimer()); PoolTimer &timer = *ptr; - RPCTargetPool pool(std::move(ptr), 0.666); + RPCTargetPool pool(std::move(ptr), 0.666, 1 ); // Assert that all connections expire. RPCTarget::SP target; diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index fbe00275626..41298c28a88 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -133,7 +133,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), - _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), + _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs(), params.getNumRpcTargets())), _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)), _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4_Ki)), _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 64_Ki)), diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index b42ac47e54d..0ed2d15ed9e 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -4,23 +4,41 @@ namespace mbus { -RPCTargetPool::Entry::Entry(RPCTarget::SP target, uint64_t lastUse) : - _target(target), - _lastUse(lastUse) +RPCTargetPool::Entry::Entry(std::vector<RPCTarget::SP> targets, uint64_t lastUse) + : _targets(std::move(targets)), + _lastUse(lastUse), + _next(0) { } -RPCTargetPool::RPCTargetPool(double expireSecs) : - _lock(), - _targets(), - _timer(new SteadyTimer()), - _expireMillis(static_cast<uint64_t>(expireSecs * 1000)) +RPCTarget::SP +RPCTargetPool::Entry::getTarget(const LockGuard &, uint64_t now) { + if (_next >= _targets.size()) { + _next = 0; + } + RPCTarget::SP target = _targets[_next++]; + if ( ! target->isValid()) return RPCTarget::SP(); + _lastUse = now; + return target; +} + +bool +RPCTargetPool::Entry::inUse(const LockGuard &) const { + for (const auto & target : _targets) { + if (target.use_count() > 1) return true; + } + return false; +} + +RPCTargetPool::RPCTargetPool(double expireSecs, size_t numTargetsPerSpec) + : RPCTargetPool(std::make_unique<SteadyTimer>(), expireSecs, numTargetsPerSpec) { } -RPCTargetPool::RPCTargetPool(ITimer::UP timer, double expireSecs) : +RPCTargetPool::RPCTargetPool(ITimer::UP timer, double expireSecs, size_t numTargetsPerSpec) : _lock(), _targets(), _timer(std::move(timer)), - _expireMillis(static_cast<uint64_t>(expireSecs * 1000)) + _expireMillis(static_cast<uint64_t>(expireSecs * 1000)), + _numTargetsPerSpec(numTargetsPerSpec) { } RPCTargetPool::~RPCTargetPool() @@ -35,21 +53,12 @@ RPCTargetPool::flushTargets(bool force) LockGuard guard(_lock); TargetMap::iterator it = _targets.begin(); while (it != _targets.end()) { - Entry &entry = it->second; - if (entry._target.get() != nullptr) { - if (entry._target.use_count() > 1) { - entry._lastUse = currentTime; - ++it; - continue; // someone is using this - } - if (!force) { - if (entry._lastUse + _expireMillis > currentTime) { - ++it; - continue; // not sufficiently idle - } - } + const Entry &entry = it->second; + if ( ! entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) { + _targets.erase(it++); // postfix increment to move the iterator + } else { + ++it; } - _targets.erase(it++); // postfix increment to move the iterator } } @@ -68,16 +77,19 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address) LockGuard guard(_lock); auto it = _targets.find(spec); if (it != _targets.end()) { - Entry &entry = it->second; - if (entry._target->isValid()) { - entry._lastUse = currentTime; - return entry._target; + RPCTarget::SP target = it->second.getTarget(guard, currentTime); + if (target) { + return target; } _targets.erase(it); } - auto ret = std::make_shared<RPCTarget>(spec, orb); - _targets.insert(TargetMap::value_type(spec, Entry(ret, currentTime))); - return ret; + std::vector<RPCTarget::SP> targets; + targets.reserve(_numTargetsPerSpec); + for (size_t i(0); i < _numTargetsPerSpec; i++) { + targets.push_back(std::make_shared<RPCTarget>(spec, orb)); + } + _targets.insert(TargetMap::value_type(spec, Entry(std::move(targets), currentTime))); + return _targets.find(spec)->second.getTarget(guard, currentTime); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h index bc9e1a4b19f..2a6fb75d511 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h @@ -16,24 +16,30 @@ namespace mbus { */ class RPCTargetPool { private: + using LockGuard = std::lock_guard<std::mutex>; /** * Implements a helper class holds the necessary reference and token counter * for a JRT target to keep connections open as long as they get used from * time to time. */ - struct Entry { - RPCTarget::SP _target; - uint64_t _lastUse; - - Entry(RPCTarget::SP target, uint64_t lastUse); + class Entry { + public: + Entry(std::vector<RPCTarget::SP> targets, uint64_t lastUse); + RPCTarget::SP getTarget(const LockGuard & guard, uint64_t now); + uint64_t lastUse() const { return _lastUse; } + bool inUse(const LockGuard & guard) const; + private: + std::vector<RPCTarget::SP> _targets; + uint64_t _lastUse; + size_t _next; }; using TargetMap = std::map<string, Entry>; - using LockGuard = std::lock_guard<std::mutex>; std::mutex _lock; TargetMap _targets; ITimer::UP _timer; uint64_t _expireMillis; + size_t _numTargetsPerSpec; public: RPCTargetPool(const RPCTargetPool &) = delete; @@ -46,7 +52,7 @@ public: * @param expireSecs The number of seconds until an idle connection is * closed. */ - RPCTargetPool(double expireSecs); + RPCTargetPool(double expireSecs, size_t numTargetsPerSpec); /** * Constructs a new instance of this class, using the given {@link Timer} @@ -57,7 +63,7 @@ public: * @param expireSecs The number of seconds until an idle connection is * closed. */ - RPCTargetPool(ITimer::UP timer, double expireSecs); + RPCTargetPool(ITimer::UP timer, double expireSecs, size_t numTargetsPerSpec); /** * Destructor. Frees any allocated resources. |