diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-05-19 13:47:22 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-05-19 13:47:22 +0200 |
commit | 920585840be54248b6df82fee258be8313274337 (patch) | |
tree | 18f73dbbe5dca0cc5a14ba7f12eeb8c1a196bb7a /messagebus/src/main | |
parent | 3d6512745fc195494a4614e941423ed398d3f9bc (diff) |
Prepare for using multiple connections per spec
Diffstat (limited to 'messagebus/src/main')
3 files changed, 75 insertions, 24 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index 4a263aaae4c..adf889a7b6f 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -85,7 +85,7 @@ public class RPCNetwork implements Network, MethodHandler { orb = new Supervisor(new Transport(2)); orb.setMaxInputBufferSize(params.getMaxInputBufferSize()); orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize()); - targetPool = new RPCTargetPool(params.getConnectionExpireSecs()); + targetPool = new RPCTargetPool(params.getConnectionExpireSecs(), params.getNumTargetsPerSpec()); servicePool = new RPCServicePool(this, 4096); Method method = new Method("mbus.getVersion", "", "s", this); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java index e50670530d3..0517d42659d 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java @@ -19,6 +19,7 @@ public class RPCNetworkParams { private int maxInputBufferSize = 256 * 1024; private int maxOutputBufferSize = 256 * 1024; private double connectionExpireSecs = 30; + private int numTargetsPerSpec = 1; /** * Constructs a new instance of this class with reasonable default values. @@ -40,6 +41,7 @@ public class RPCNetworkParams { connectionExpireSecs = params.connectionExpireSecs; maxInputBufferSize = params.maxInputBufferSize; maxOutputBufferSize = params.maxOutputBufferSize; + numTargetsPerSpec = params.numTargetsPerSpec; } /** @@ -142,6 +144,14 @@ public class RPCNetworkParams { return this; } + RPCNetworkParams setNumTargetsPerSpec(int numTargetsPerSpec) { + this.numTargetsPerSpec = numTargetsPerSpec; + return this; + } + int getNumTargetsPerSpec() { + return numTargetsPerSpec; + } + /** * Returns the maximum input buffer size allowed for the underlying FNET connection. * diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java index 3ae73c1fea0..e681f0834a8 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java @@ -6,6 +6,7 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.concurrent.SystemTimer; import com.yahoo.concurrent.Timer; +import java.io.Closeable; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -17,9 +18,10 @@ import java.util.Map; */ public class RPCTargetPool { - private final Map<String, Entry> targets = new HashMap<String, Entry>(); + private final Map<String, Entry> targets = new HashMap<>(); private final Timer timer; private final long expireMillis; + private final int numTargetsPerSpec; /** * Constructs a new instance of this class, and registers the {@link SystemTimer} for detecting and closing @@ -27,8 +29,8 @@ public class RPCTargetPool { * * @param expireSecs The number of seconds until an idle connection is closed. */ - public RPCTargetPool(double expireSecs) { - this(SystemTimer.INSTANCE, expireSecs); + public RPCTargetPool(double expireSecs, int numTargetsPerSpec) { + this(SystemTimer.INSTANCE, expireSecs, numTargetsPerSpec); } /** @@ -38,9 +40,10 @@ public class RPCTargetPool { * @param timer The timer to use for connection expiration. * @param expireSecs The number of seconds until an idle connection is closed. */ - public RPCTargetPool(Timer timer, double expireSecs) { + public RPCTargetPool(Timer timer, double expireSecs, int numTargetsPerSpec) { this.timer = timer; this.expireMillis = (long)(expireSecs * 1000); + this.numTargetsPerSpec = numTargetsPerSpec; } /** @@ -56,9 +59,8 @@ public class RPCTargetPool { long expireTime = currentTime - expireMillis; while (it.hasNext()) { Entry entry = it.next(); - RPCTarget target = entry.target; - if (target.getJRTTarget().isValid()) { - if (target.getRefCount() > 1) { + if (entry.isValid()) { + if (entry.getRefCount() > 1) { entry.lastUse = currentTime; continue; // someone is using this } @@ -68,7 +70,7 @@ public class RPCTargetPool { } } } - target.subRef(); + entry.close(); it.remove(); } } @@ -85,23 +87,25 @@ public class RPCTargetPool { public RPCTarget getTarget(Supervisor orb, RPCServiceAddress address) { Spec spec = address.getConnectionSpec(); String key = spec.toString(); - RPCTarget ret; + long now = timer.milliTime(); synchronized (this) { Entry entry = targets.get(key); if (entry != null) { - if (entry.target.getJRTTarget().isValid()) { - entry.target.addRef(); - entry.lastUse = timer.milliTime(); - return entry.target; + RPCTarget target = entry.getTarget(now); + if (target != null) { + return target; } - entry.target.subRef(); + entry.close(); targets.remove(key); } - ret = new RPCTarget(spec, orb); - targets.put(key, new Entry(ret, timer.milliTime())); + RPCTarget [] tmpTargets = new RPCTarget[numTargetsPerSpec]; + for (int i=0; i < tmpTargets.length; i++) { + tmpTargets[i] = new RPCTarget(spec, orb); + } + entry = new Entry(tmpTargets, now); + targets.put(key, entry); + return entry.getTarget(now); } - ret.addRef(); - return ret; } @@ -118,14 +122,51 @@ public class RPCTargetPool { * Implements a helper class holds the necessary reference and timestamp of a target. The lastUse member is updated * when a call to {@link RPCTargetPool#flushTargets(boolean)} iterates over an active target. */ - private static class Entry { + private static class Entry implements Closeable { - final RPCTarget target; - long lastUse = 0; + private final RPCTarget [] targets; + private int index; + long lastUse; - Entry(RPCTarget target, long lastUse) { - this.target = target; + Entry(RPCTarget [] targets, long lastUse) { + this.targets = targets; this.lastUse = lastUse; } + RPCTarget getTarget(long now) { + if (index >= targets.length) { + index = 0; + } + RPCTarget target = targets[index]; + if (target.getJRTTarget().isValid()) { + target.addRef(); + lastUse = now; + index++; + return target; + } + return null; + } + boolean isValid() { + for (RPCTarget target : targets) { + if ( ! target.getJRTTarget().isValid()) { + return false; + } + } + return true; + } + + int getRefCount() { + int refCount = 0; + for (RPCTarget target : targets) { + refCount += target.getRefCount(); + } + return refCount; + } + + @Override + public void close() { + for (RPCTarget target : targets) { + target.subRef(); + } + } } } |