aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-05-19 13:47:22 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-05-19 13:47:22 +0200
commit920585840be54248b6df82fee258be8313274337 (patch)
tree18f73dbbe5dca0cc5a14ba7f12eeb8c1a196bb7a /messagebus
parent3d6512745fc195494a4614e941423ed398d3f9bc (diff)
Prepare for using multiple connections per spec
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java2
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java10
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java87
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java38
4 files changed, 110 insertions, 27 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 4a263aaae4c..adf889a7b6f 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -85,7 +85,7 @@ public class RPCNetwork implements Network, MethodHandler {
orb = new Supervisor(new Transport(2));
orb.setMaxInputBufferSize(params.getMaxInputBufferSize());
orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize());
- targetPool = new RPCTargetPool(params.getConnectionExpireSecs());
+ targetPool = new RPCTargetPool(params.getConnectionExpireSecs(), params.getNumTargetsPerSpec());
servicePool = new RPCServicePool(this, 4096);
Method method = new Method("mbus.getVersion", "", "s", this);
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
index e50670530d3..0517d42659d 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetworkParams.java
@@ -19,6 +19,7 @@ public class RPCNetworkParams {
private int maxInputBufferSize = 256 * 1024;
private int maxOutputBufferSize = 256 * 1024;
private double connectionExpireSecs = 30;
+ private int numTargetsPerSpec = 1;
/**
* Constructs a new instance of this class with reasonable default values.
@@ -40,6 +41,7 @@ public class RPCNetworkParams {
connectionExpireSecs = params.connectionExpireSecs;
maxInputBufferSize = params.maxInputBufferSize;
maxOutputBufferSize = params.maxOutputBufferSize;
+ numTargetsPerSpec = params.numTargetsPerSpec;
}
/**
@@ -142,6 +144,14 @@ public class RPCNetworkParams {
return this;
}
+ RPCNetworkParams setNumTargetsPerSpec(int numTargetsPerSpec) {
+ this.numTargetsPerSpec = numTargetsPerSpec;
+ return this;
+ }
+ int getNumTargetsPerSpec() {
+ return numTargetsPerSpec;
+ }
+
/**
* Returns the maximum input buffer size allowed for the underlying FNET connection.
*
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java
index 3ae73c1fea0..e681f0834a8 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCTargetPool.java
@@ -6,6 +6,7 @@ import com.yahoo.jrt.Supervisor;
import com.yahoo.concurrent.SystemTimer;
import com.yahoo.concurrent.Timer;
+import java.io.Closeable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -17,9 +18,10 @@ import java.util.Map;
*/
public class RPCTargetPool {
- private final Map<String, Entry> targets = new HashMap<String, Entry>();
+ private final Map<String, Entry> targets = new HashMap<>();
private final Timer timer;
private final long expireMillis;
+ private final int numTargetsPerSpec;
/**
* Constructs a new instance of this class, and registers the {@link SystemTimer} for detecting and closing
@@ -27,8 +29,8 @@ public class RPCTargetPool {
*
* @param expireSecs The number of seconds until an idle connection is closed.
*/
- public RPCTargetPool(double expireSecs) {
- this(SystemTimer.INSTANCE, expireSecs);
+ public RPCTargetPool(double expireSecs, int numTargetsPerSpec) {
+ this(SystemTimer.INSTANCE, expireSecs, numTargetsPerSpec);
}
/**
@@ -38,9 +40,10 @@ public class RPCTargetPool {
* @param timer The timer to use for connection expiration.
* @param expireSecs The number of seconds until an idle connection is closed.
*/
- public RPCTargetPool(Timer timer, double expireSecs) {
+ public RPCTargetPool(Timer timer, double expireSecs, int numTargetsPerSpec) {
this.timer = timer;
this.expireMillis = (long)(expireSecs * 1000);
+ this.numTargetsPerSpec = numTargetsPerSpec;
}
/**
@@ -56,9 +59,8 @@ public class RPCTargetPool {
long expireTime = currentTime - expireMillis;
while (it.hasNext()) {
Entry entry = it.next();
- RPCTarget target = entry.target;
- if (target.getJRTTarget().isValid()) {
- if (target.getRefCount() > 1) {
+ if (entry.isValid()) {
+ if (entry.getRefCount() > 1) {
entry.lastUse = currentTime;
continue; // someone is using this
}
@@ -68,7 +70,7 @@ public class RPCTargetPool {
}
}
}
- target.subRef();
+ entry.close();
it.remove();
}
}
@@ -85,23 +87,25 @@ public class RPCTargetPool {
public RPCTarget getTarget(Supervisor orb, RPCServiceAddress address) {
Spec spec = address.getConnectionSpec();
String key = spec.toString();
- RPCTarget ret;
+ long now = timer.milliTime();
synchronized (this) {
Entry entry = targets.get(key);
if (entry != null) {
- if (entry.target.getJRTTarget().isValid()) {
- entry.target.addRef();
- entry.lastUse = timer.milliTime();
- return entry.target;
+ RPCTarget target = entry.getTarget(now);
+ if (target != null) {
+ return target;
}
- entry.target.subRef();
+ entry.close();
targets.remove(key);
}
- ret = new RPCTarget(spec, orb);
- targets.put(key, new Entry(ret, timer.milliTime()));
+ RPCTarget [] tmpTargets = new RPCTarget[numTargetsPerSpec];
+ for (int i=0; i < tmpTargets.length; i++) {
+ tmpTargets[i] = new RPCTarget(spec, orb);
+ }
+ entry = new Entry(tmpTargets, now);
+ targets.put(key, entry);
+ return entry.getTarget(now);
}
- ret.addRef();
- return ret;
}
@@ -118,14 +122,51 @@ public class RPCTargetPool {
* Implements a helper class holds the necessary reference and timestamp of a target. The lastUse member is updated
* when a call to {@link RPCTargetPool#flushTargets(boolean)} iterates over an active target.
*/
- private static class Entry {
+ private static class Entry implements Closeable {
- final RPCTarget target;
- long lastUse = 0;
+ private final RPCTarget [] targets;
+ private int index;
+ long lastUse;
- Entry(RPCTarget target, long lastUse) {
- this.target = target;
+ Entry(RPCTarget [] targets, long lastUse) {
+ this.targets = targets;
this.lastUse = lastUse;
}
+ RPCTarget getTarget(long now) {
+ if (index >= targets.length) {
+ index = 0;
+ }
+ RPCTarget target = targets[index];
+ if (target.getJRTTarget().isValid()) {
+ target.addRef();
+ lastUse = now;
+ index++;
+ return target;
+ }
+ return null;
+ }
+ boolean isValid() {
+ for (RPCTarget target : targets) {
+ if ( ! target.getJRTTarget().isValid()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ int getRefCount() {
+ int refCount = 0;
+ for (RPCTarget target : targets) {
+ refCount += target.getRefCount();
+ }
+ return refCount;
+ }
+
+ @Override
+ public void close() {
+ for (RPCTarget target : targets) {
+ target.subRef();
+ }
+ }
}
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
index a75a765d5a8..71dd7ba1da2 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
@@ -11,12 +11,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
/**
* @author Simon Thoresen Hult
@@ -44,14 +45,45 @@ public class TargetPoolTestCase {
}
@Test
- public void testConnectionExpire() throws ListenFailedException, UnknownHostException {
+ public void testConnectionCycling() {
+ // Necessary setup to be able to resolve targets.
+ RPCServiceAddress adr1 = registerServer();
+
+ PoolTimer timer = new PoolTimer();
+ RPCTargetPool pool1 = new RPCTargetPool(timer, 0.666, 1);
+
+ RPCTarget target1 = pool1.getTarget(orb, adr1);
+ RPCTarget target2 = pool1.getTarget(orb, adr1);
+ assertSame(target1, target2);
+ target1.subRef();
+ target2.subRef();
+
+ RPCTargetPool pool3 = new RPCTargetPool(timer, 0.666, 3);
+
+ target1 = pool3.getTarget(orb, adr1);
+ target2 = pool3.getTarget(orb, adr1);
+ RPCTarget target3 = pool3.getTarget(orb, adr1);
+ assertNotSame(target1, target2);
+ assertNotSame(target2, target3);
+ assertNotSame(target3, target1);
+
+
+ RPCTarget target4 = pool3.getTarget(orb, adr1);
+ assertSame(target1, target4);
+ target1.subRef();
+ target2.subRef();
+ target3.subRef();
+ target4.subRef();
+ }
+ @Test
+ public void testConnectionExpire() {
// Necessary setup to be able to resolve targets.
RPCServiceAddress adr1 = registerServer();
RPCServiceAddress adr2 = registerServer();
RPCServiceAddress adr3 = registerServer();
PoolTimer timer = new PoolTimer();
- RPCTargetPool pool = new RPCTargetPool(timer, 0.666);
+ RPCTargetPool pool = new RPCTargetPool(timer, 0.666, 1);
// Assert that all connections expire.
RPCTarget target;