summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java48
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java60
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java7
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java23
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java4
6 files changed, 88 insertions, 56 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java
new file mode 100644
index 00000000000..59cafed1836
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java
@@ -0,0 +1,48 @@
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.jrt.slobrok.api.Mirror;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class NamedRPCService implements RPCService {
+ private final IMirror mirror;
+ private final String pattern;
+ private int addressIdx = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
+ private int addressGen = 0;
+ private List<Mirror.Entry> addressList = null;
+
+ /**
+ * Create a new RPCService backed by the given network and using the given service pattern.
+ *
+ * @param mirror The naming server to send queries to.
+ * @param pattern The pattern to use when querying.
+ */
+ public NamedRPCService(IMirror mirror, String pattern) {
+ this.mirror = mirror;
+ this.pattern = pattern;
+ }
+
+ /**
+ * Resolve a concrete address from this service. This service may represent multiple remote sessions, so this will
+ * select one that is online.
+ *
+ * @return A concrete service address.
+ */
+ public synchronized RPCServiceAddress resolve() {
+ if (addressGen != mirror.updates()) {
+ addressGen = mirror.updates();
+ addressList = mirror.lookup(pattern);
+ }
+ if (addressList != null && !addressList.isEmpty()) {
+ ++addressIdx;
+ if (addressIdx >= addressList.size()) {
+ addressIdx = 0;
+ }
+ Mirror.Entry entry = addressList.get(addressIdx);
+ return new RPCServiceAddress(entry.getName(), entry.getSpec());
+ }
+ return null;
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
index fb3c4cf9971..889df32ce1e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
@@ -2,10 +2,6 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.jrt.slobrok.api.IMirror;
-import com.yahoo.jrt.slobrok.api.Mirror;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
/**
* An RPCService represents a set of remote sessions matching a service pattern. The sessions are monitored using the
@@ -13,23 +9,13 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author havardpe
*/
-public class RPCService {
-
- private final IMirror mirror;
- private final String pattern;
- private int addressIdx = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
- private int addressGen = 0;
- private List<Mirror.Entry> addressList = null;
+public interface RPCService {
- /**
- * Create a new RPCService backed by the given network and using the given service pattern.
- *
- * @param mirror The naming server to send queries to.
- * @param pattern The pattern to use when querying.
- */
- public RPCService(IMirror mirror, String pattern) {
- this.mirror = mirror;
- this.pattern = pattern;
+ static RPCService create(IMirror mirror, String pattern) {
+ if (pattern.startsWith("tcp/")) {
+ return new TcpRPCService(pattern);
+ }
+ return new NamedRPCService(mirror, pattern);
}
/**
@@ -38,38 +24,6 @@ public class RPCService {
*
* @return A concrete service address.
*/
- public RPCServiceAddress resolve() {
- if (pattern.startsWith("tcp/")) {
- int pos = pattern.lastIndexOf('/');
- if (pos > 0 && pos < pattern.length() - 1) {
- RPCServiceAddress ret = new RPCServiceAddress(pattern, pattern.substring(0, pos));
- if (!ret.isMalformed()) {
- return ret;
- }
- }
- } else {
- if (addressGen != mirror.updates()) {
- addressGen = mirror.updates();
- addressList = mirror.lookup(pattern);
- }
- if (addressList != null && !addressList.isEmpty()) {
- ++addressIdx;
- if (addressIdx >= addressList.size()) {
- addressIdx = 0;
- }
- Mirror.Entry entry = addressList.get(addressIdx);
- return new RPCServiceAddress(entry.getName(), entry.getSpec());
- }
- }
- return null;
- }
+ RPCServiceAddress resolve();
- /**
- * Returns the pattern used when querying for the naming server for addresses. This is given at construtor time.
- *
- * @return The service pattern.
- */
- String getPattern() {
- return pattern;
- }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
index 0a6a58d4e89..1b7bcf01731 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
@@ -34,6 +34,13 @@ public class RPCServiceAddress implements ServiceAddress {
public RPCServiceAddress(String serviceName, String connectionSpec) {
this(serviceName, new Spec(connectionSpec));
}
+ public RPCServiceAddress(RPCServiceAddress blueprint) {
+ serviceName = blueprint.serviceName;
+ sessionName = blueprint.sessionName;
+ connectionSpec = blueprint.connectionSpec;
+ target = null;
+ }
+
@Override
public boolean equals(Object obj) {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
index abd33d6c9c2..e67688e1fca 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
@@ -36,7 +36,7 @@ public class RPCServicePool {
public RPCServiceAddress resolve(String pattern) {
RPCService service = services.get().get(pattern);
if (service == null) {
- service = new RPCService(net.getMirror(), pattern);
+ service = RPCService.create(net.getMirror(), pattern);
services.get().put(pattern, service);
}
return service.resolve();
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java
new file mode 100644
index 00000000000..e2fae59b429
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java
@@ -0,0 +1,23 @@
+package com.yahoo.messagebus.network.rpc;
+
+public class TcpRPCService implements RPCService {
+ private final RPCServiceAddress blueprint;
+
+ TcpRPCService(String pattern) {
+ if ( ! pattern.startsWith("tcp/")) {
+ throw new IllegalArgumentException("Expect tcp adress to start with 'tcp/', was: " + pattern);
+ }
+ RPCServiceAddress ret = null;
+ int pos = pattern.lastIndexOf('/');
+ if (pos > 0 && pos < pattern.length() - 1) {
+ ret = new RPCServiceAddress(pattern, pattern.substring(0, pos));
+ if ( ret.isMalformed()) {
+ ret = null;
+ }
+ }
+ blueprint = ret;
+ }
+ public RPCServiceAddress resolve() {
+ return blueprint != null ? new RPCServiceAddress(blueprint) : null;
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
index 1dbb30de585..0343b075579 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
@@ -83,11 +83,11 @@ public class ServiceAddressTestCase {
}
private void assertNullAddress(String pattern) {
- assertNull(new RPCService(network.getMirror(), pattern).resolve());
+ assertNull(RPCService.create(network.getMirror(), pattern).resolve());
}
private void assertAddress(String pattern, String expectedSpec, String expectedSession) {
- RPCService service = new RPCService(network.getMirror(), pattern);
+ RPCService service = RPCService.create(network.getMirror(), pattern);
RPCServiceAddress obj = service.resolve();
assertNotNull(obj);
assertNotNull(obj.getConnectionSpec());