diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-05-19 13:47:22 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-05-19 13:47:22 +0200 |
commit | 920585840be54248b6df82fee258be8313274337 (patch) | |
tree | 18f73dbbe5dca0cc5a14ba7f12eeb8c1a196bb7a /messagebus | |
parent | 3d6512745fc195494a4614e941423ed398d3f9bc (diff) |
Prepare for using multiple connections per spec
Diffstat (limited to 'messagebus')
4 files changed, 110 insertions, 27 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index 4a263aaae4c..adf889a7b6f 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -85,7 +85,7 @@ public class RPCNetwork implements Network, MethodHandler { orb = new Supervisor(new Transport(2)); orb.setMaxInputBufferSize(params.getMaxInputBufferSize()); orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize()); - targetPool = new RPCTargetPool(params.getConnectionExpireSecs()); + targetPool = new RPCTargetPool(params.getConnectionExpireSecs(), params.getNumTargetsPerSpec()); servicePool = new RPCServicePool(this, 4096); Method method = new Method("mbus.getVersion", "", "s", this); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java index e50670530d3..0517d42659d 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java @@ -19,6 +19,7 @@ public class RPCNetworkParams { private int maxInputBufferSize = 256 * 1024; private int maxOutputBufferSize = 256 * 1024; private double connectionExpireSecs = 30; + private int numTargetsPerSpec = 1; /** * Constructs a new instance of this class with reasonable default values. @@ -40,6 +41,7 @@ public class RPCNetworkParams { connectionExpireSecs = params.connectionExpireSecs; maxInputBufferSize = params.maxInputBufferSize; maxOutputBufferSize = params.maxOutputBufferSize; + numTargetsPerSpec = params.numTargetsPerSpec; } /** @@ -142,6 +144,14 @@ public class RPCNetworkParams { return this; } + RPCNetworkParams setNumTargetsPerSpec(int numTargetsPerSpec) { + this.numTargetsPerSpec = numTargetsPerSpec; + return this; + } + int getNumTargetsPerSpec() { + return numTargetsPerSpec; + } + /** * Returns the maximum input buffer size allowed for the underlying FNET connection. * diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java index 3ae73c1fea0..e681f0834a8 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java @@ -6,6 +6,7 @@ import com.yahoo.jrt.Supervisor; import com.yahoo.concurrent.SystemTimer; import com.yahoo.concurrent.Timer; +import java.io.Closeable; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -17,9 +18,10 @@ import java.util.Map; */ public class RPCTargetPool { - private final Map<String, Entry> targets = new HashMap<String, Entry>(); + private final Map<String, Entry> targets = new HashMap<>(); private final Timer timer; private final long expireMillis; + private final int numTargetsPerSpec; /** * Constructs a new instance of this class, and registers the {@link SystemTimer} for detecting and closing @@ -27,8 +29,8 @@ public class RPCTargetPool { * * @param expireSecs The number of seconds until an idle connection is closed. */ - public RPCTargetPool(double expireSecs) { - this(SystemTimer.INSTANCE, expireSecs); + public RPCTargetPool(double expireSecs, int numTargetsPerSpec) { + this(SystemTimer.INSTANCE, expireSecs, numTargetsPerSpec); } /** @@ -38,9 +40,10 @@ public class RPCTargetPool { * @param timer The timer to use for connection expiration. * @param expireSecs The number of seconds until an idle connection is closed. */ - public RPCTargetPool(Timer timer, double expireSecs) { + public RPCTargetPool(Timer timer, double expireSecs, int numTargetsPerSpec) { this.timer = timer; this.expireMillis = (long)(expireSecs * 1000); + this.numTargetsPerSpec = numTargetsPerSpec; } /** @@ -56,9 +59,8 @@ public class RPCTargetPool { long expireTime = currentTime - expireMillis; while (it.hasNext()) { Entry entry = it.next(); - RPCTarget target = entry.target; - if (target.getJRTTarget().isValid()) { - if (target.getRefCount() > 1) { + if (entry.isValid()) { + if (entry.getRefCount() > 1) { entry.lastUse = currentTime; continue; // someone is using this } @@ -68,7 +70,7 @@ public class RPCTargetPool { } } } - target.subRef(); + entry.close(); it.remove(); } } @@ -85,23 +87,25 @@ public class RPCTargetPool { public RPCTarget getTarget(Supervisor orb, RPCServiceAddress address) { Spec spec = address.getConnectionSpec(); String key = spec.toString(); - RPCTarget ret; + long now = timer.milliTime(); synchronized (this) { Entry entry = targets.get(key); if (entry != null) { - if (entry.target.getJRTTarget().isValid()) { - entry.target.addRef(); - entry.lastUse = timer.milliTime(); - return entry.target; + RPCTarget target = entry.getTarget(now); + if (target != null) { + return target; } - entry.target.subRef(); + entry.close(); targets.remove(key); } - ret = new RPCTarget(spec, orb); - targets.put(key, new Entry(ret, timer.milliTime())); + RPCTarget [] tmpTargets = new RPCTarget[numTargetsPerSpec]; + for (int i=0; i < tmpTargets.length; i++) { + tmpTargets[i] = new RPCTarget(spec, orb); + } + entry = new Entry(tmpTargets, now); + targets.put(key, entry); + return entry.getTarget(now); } - ret.addRef(); - return ret; } @@ -118,14 +122,51 @@ public class RPCTargetPool { * Implements a helper class holds the necessary reference and timestamp of a target. The lastUse member is updated * when a call to {@link RPCTargetPool#flushTargets(boolean)} iterates over an active target. */ - private static class Entry { + private static class Entry implements Closeable { - final RPCTarget target; - long lastUse = 0; + private final RPCTarget [] targets; + private int index; + long lastUse; - Entry(RPCTarget target, long lastUse) { - this.target = target; + Entry(RPCTarget [] targets, long lastUse) { + this.targets = targets; this.lastUse = lastUse; } + RPCTarget getTarget(long now) { + if (index >= targets.length) { + index = 0; + } + RPCTarget target = targets[index]; + if (target.getJRTTarget().isValid()) { + target.addRef(); + lastUse = now; + index++; + return target; + } + return null; + } + boolean isValid() { + for (RPCTarget target : targets) { + if ( ! target.getJRTTarget().isValid()) { + return false; + } + } + return true; + } + + int getRefCount() { + int refCount = 0; + for (RPCTarget target : targets) { + refCount += target.getRefCount(); + } + return refCount; + } + + @Override + public void close() { + for (RPCTarget target : targets) { + target.subRef(); + } + } } } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java index a75a765d5a8..71dd7ba1da2 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java @@ -11,12 +11,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; /** * @author Simon Thoresen Hult @@ -44,14 +45,45 @@ public class TargetPoolTestCase { } @Test - public void testConnectionExpire() throws ListenFailedException, UnknownHostException { + public void testConnectionCycling() { + // Necessary setup to be able to resolve targets. + RPCServiceAddress adr1 = registerServer(); + + PoolTimer timer = new PoolTimer(); + RPCTargetPool pool1 = new RPCTargetPool(timer, 0.666, 1); + + RPCTarget target1 = pool1.getTarget(orb, adr1); + RPCTarget target2 = pool1.getTarget(orb, adr1); + assertSame(target1, target2); + target1.subRef(); + target2.subRef(); + + RPCTargetPool pool3 = new RPCTargetPool(timer, 0.666, 3); + + target1 = pool3.getTarget(orb, adr1); + target2 = pool3.getTarget(orb, adr1); + RPCTarget target3 = pool3.getTarget(orb, adr1); + assertNotSame(target1, target2); + assertNotSame(target2, target3); + assertNotSame(target3, target1); + + + RPCTarget target4 = pool3.getTarget(orb, adr1); + assertSame(target1, target4); + target1.subRef(); + target2.subRef(); + target3.subRef(); + target4.subRef(); + } + @Test + public void testConnectionExpire() { // Necessary setup to be able to resolve targets. RPCServiceAddress adr1 = registerServer(); RPCServiceAddress adr2 = registerServer(); RPCServiceAddress adr3 = registerServer(); PoolTimer timer = new PoolTimer(); - RPCTargetPool pool = new RPCTargetPool(timer, 0.666); + RPCTargetPool pool = new RPCTargetPool(timer, 0.666, 1); // Assert that all connections expire. RPCTarget target; |