aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-07-02 10:00:42 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-07-02 10:00:42 +0000
commit6ee538b960508d23242dc55e7373e57f2f3e7b90 (patch)
tree6fbbcef3857057762c475cbb65e3f270b220158a
parent683b06daeb3cdfc62ca245f0601a0de8307ddce1 (diff)
Implement use of multiple rpc targets in mbus
-rw-r--r--messagebus/src/tests/targetpool/targetpool.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp74
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.h22
4 files changed, 59 insertions, 41 deletions
diff --git a/messagebus/src/tests/targetpool/targetpool.cpp b/messagebus/src/tests/targetpool/targetpool.cpp
index 9259f992d6c..9ff352abcf5 100644
--- a/messagebus/src/tests/targetpool/targetpool.cpp
+++ b/messagebus/src/tests/targetpool/targetpool.cpp
@@ -37,7 +37,7 @@ TEST("targetpool_test") {
FRT_Supervisor & orb = server.supervisor();
std::unique_ptr<PoolTimer> ptr(new PoolTimer());
PoolTimer &timer = *ptr;
- RPCTargetPool pool(std::move(ptr), 0.666);
+ RPCTargetPool pool(std::move(ptr), 0.666, 1 );
// Assert that all connections expire.
RPCTarget::SP target;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index fbe00275626..41298c28a88 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -133,7 +133,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_requestedPort(params.getListenPort()),
- _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
+ _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs(), params.getNumRpcTargets())),
_targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)),
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4_Ki)),
_executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 64_Ki)),
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index b42ac47e54d..0ed2d15ed9e 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -4,23 +4,41 @@
namespace mbus {
-RPCTargetPool::Entry::Entry(RPCTarget::SP target, uint64_t lastUse) :
- _target(target),
- _lastUse(lastUse)
+RPCTargetPool::Entry::Entry(std::vector<RPCTarget::SP> targets, uint64_t lastUse)
+ : _targets(std::move(targets)),
+ _lastUse(lastUse),
+ _next(0)
{ }
-RPCTargetPool::RPCTargetPool(double expireSecs) :
- _lock(),
- _targets(),
- _timer(new SteadyTimer()),
- _expireMillis(static_cast<uint64_t>(expireSecs * 1000))
+RPCTarget::SP
+RPCTargetPool::Entry::getTarget(const LockGuard &, uint64_t now) {
+ if (_next >= _targets.size()) {
+ _next = 0;
+ }
+ RPCTarget::SP target = _targets[_next++];
+ if ( ! target->isValid()) return RPCTarget::SP();
+ _lastUse = now;
+ return target;
+}
+
+bool
+RPCTargetPool::Entry::inUse(const LockGuard &) const {
+ for (const auto & target : _targets) {
+ if (target.use_count() > 1) return true;
+ }
+ return false;
+}
+
+RPCTargetPool::RPCTargetPool(double expireSecs, size_t numTargetsPerSpec)
+ : RPCTargetPool(std::make_unique<SteadyTimer>(), expireSecs, numTargetsPerSpec)
{ }
-RPCTargetPool::RPCTargetPool(ITimer::UP timer, double expireSecs) :
+RPCTargetPool::RPCTargetPool(ITimer::UP timer, double expireSecs, size_t numTargetsPerSpec) :
_lock(),
_targets(),
_timer(std::move(timer)),
- _expireMillis(static_cast<uint64_t>(expireSecs * 1000))
+ _expireMillis(static_cast<uint64_t>(expireSecs * 1000)),
+ _numTargetsPerSpec(numTargetsPerSpec)
{ }
RPCTargetPool::~RPCTargetPool()
@@ -35,21 +53,12 @@ RPCTargetPool::flushTargets(bool force)
LockGuard guard(_lock);
TargetMap::iterator it = _targets.begin();
while (it != _targets.end()) {
- Entry &entry = it->second;
- if (entry._target.get() != nullptr) {
- if (entry._target.use_count() > 1) {
- entry._lastUse = currentTime;
- ++it;
- continue; // someone is using this
- }
- if (!force) {
- if (entry._lastUse + _expireMillis > currentTime) {
- ++it;
- continue; // not sufficiently idle
- }
- }
+ const Entry &entry = it->second;
+ if ( ! entry.inUse(guard) && (force || ((entry.lastUse() + _expireMillis) < currentTime))) {
+ _targets.erase(it++); // postfix increment to move the iterator
+ } else {
+ ++it;
}
- _targets.erase(it++); // postfix increment to move the iterator
}
}
@@ -68,16 +77,19 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address)
LockGuard guard(_lock);
auto it = _targets.find(spec);
if (it != _targets.end()) {
- Entry &entry = it->second;
- if (entry._target->isValid()) {
- entry._lastUse = currentTime;
- return entry._target;
+ RPCTarget::SP target = it->second.getTarget(guard, currentTime);
+ if (target) {
+ return target;
}
_targets.erase(it);
}
- auto ret = std::make_shared<RPCTarget>(spec, orb);
- _targets.insert(TargetMap::value_type(spec, Entry(ret, currentTime)));
- return ret;
+ std::vector<RPCTarget::SP> targets;
+ targets.reserve(_numTargetsPerSpec);
+ for (size_t i(0); i < _numTargetsPerSpec; i++) {
+ targets.push_back(std::make_shared<RPCTarget>(spec, orb));
+ }
+ _targets.insert(TargetMap::value_type(spec, Entry(std::move(targets), currentTime)));
+ return _targets.find(spec)->second.getTarget(guard, currentTime);
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
index bc9e1a4b19f..2a6fb75d511 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
@@ -16,24 +16,30 @@ namespace mbus {
*/
class RPCTargetPool {
private:
+ using LockGuard = std::lock_guard<std::mutex>;
/**
* Implements a helper class holds the necessary reference and token counter
* for a JRT target to keep connections open as long as they get used from
* time to time.
*/
- struct Entry {
- RPCTarget::SP _target;
- uint64_t _lastUse;
-
- Entry(RPCTarget::SP target, uint64_t lastUse);
+ class Entry {
+ public:
+ Entry(std::vector<RPCTarget::SP> targets, uint64_t lastUse);
+ RPCTarget::SP getTarget(const LockGuard & guard, uint64_t now);
+ uint64_t lastUse() const { return _lastUse; }
+ bool inUse(const LockGuard & guard) const;
+ private:
+ std::vector<RPCTarget::SP> _targets;
+ uint64_t _lastUse;
+ size_t _next;
};
using TargetMap = std::map<string, Entry>;
- using LockGuard = std::lock_guard<std::mutex>;
std::mutex _lock;
TargetMap _targets;
ITimer::UP _timer;
uint64_t _expireMillis;
+ size_t _numTargetsPerSpec;
public:
RPCTargetPool(const RPCTargetPool &) = delete;
@@ -46,7 +52,7 @@ public:
* @param expireSecs The number of seconds until an idle connection is
* closed.
*/
- RPCTargetPool(double expireSecs);
+ RPCTargetPool(double expireSecs, size_t numTargetsPerSpec);
/**
* Constructs a new instance of this class, using the given {@link Timer}
@@ -57,7 +63,7 @@ public:
* @param expireSecs The number of seconds until an idle connection is
* closed.
*/
- RPCTargetPool(ITimer::UP timer, double expireSecs);
+ RPCTargetPool(ITimer::UP timer, double expireSecs, size_t numTargetsPerSpec);
/**
* Destructor. Frees any allocated resources.