summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-09-24 23:13:09 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2021-09-24 23:13:09 +0200
commitd9d73b699aea14f33796c69a00ac036f893764c5 (patch)
tree6c6dce20eb7137dd3f7ea2f212af442f295547ed /messagebus
parent9dabf1ce317325a334fd5f6aac531456acea17c4 (diff)
As a simple tcp based rpc address is the most common, split into Tcp and Named.
Tcp variant is immutable and hence thread safe, while Named is made thread safe by synchonized.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java48
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java60
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java7
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java23
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java4
6 files changed, 88 insertions, 56 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java
new file mode 100644
index 00000000000..59cafed1836
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java
@@ -0,0 +1,48 @@
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.jrt.slobrok.api.Mirror;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class NamedRPCService implements RPCService {
+ private final IMirror mirror;
+ private final String pattern;
+ private int addressIdx = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
+ private int addressGen = 0;
+ private List<Mirror.Entry> addressList = null;
+
+ /**
+ * Create a new RPCService backed by the given network and using the given service pattern.
+ *
+ * @param mirror The naming server to send queries to.
+ * @param pattern The pattern to use when querying.
+ */
+ public NamedRPCService(IMirror mirror, String pattern) {
+ this.mirror = mirror;
+ this.pattern = pattern;
+ }
+
+ /**
+ * Resolve a concrete address from this service. This service may represent multiple remote sessions, so this will
+ * select one that is online.
+ *
+ * @return A concrete service address.
+ */
+ public synchronized RPCServiceAddress resolve() {
+ if (addressGen != mirror.updates()) {
+ addressGen = mirror.updates();
+ addressList = mirror.lookup(pattern);
+ }
+ if (addressList != null && !addressList.isEmpty()) {
+ ++addressIdx;
+ if (addressIdx >= addressList.size()) {
+ addressIdx = 0;
+ }
+ Mirror.Entry entry = addressList.get(addressIdx);
+ return new RPCServiceAddress(entry.getName(), entry.getSpec());
+ }
+ return null;
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
index fb3c4cf9971..889df32ce1e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
@@ -2,10 +2,6 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.jrt.slobrok.api.IMirror;
-import com.yahoo.jrt.slobrok.api.Mirror;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
/**
* An RPCService represents a set of remote sessions matching a service pattern. The sessions are monitored using the
@@ -13,23 +9,13 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author havardpe
*/
-public class RPCService {
-
- private final IMirror mirror;
- private final String pattern;
- private int addressIdx = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
- private int addressGen = 0;
- private List<Mirror.Entry> addressList = null;
+public interface RPCService {
- /**
- * Create a new RPCService backed by the given network and using the given service pattern.
- *
- * @param mirror The naming server to send queries to.
- * @param pattern The pattern to use when querying.
- */
- public RPCService(IMirror mirror, String pattern) {
- this.mirror = mirror;
- this.pattern = pattern;
+ static RPCService create(IMirror mirror, String pattern) {
+ if (pattern.startsWith("tcp/")) {
+ return new TcpRPCService(pattern);
+ }
+ return new NamedRPCService(mirror, pattern);
}
/**
@@ -38,38 +24,6 @@ public class RPCService {
*
* @return A concrete service address.
*/
- public RPCServiceAddress resolve() {
- if (pattern.startsWith("tcp/")) {
- int pos = pattern.lastIndexOf('/');
- if (pos > 0 && pos < pattern.length() - 1) {
- RPCServiceAddress ret = new RPCServiceAddress(pattern, pattern.substring(0, pos));
- if (!ret.isMalformed()) {
- return ret;
- }
- }
- } else {
- if (addressGen != mirror.updates()) {
- addressGen = mirror.updates();
- addressList = mirror.lookup(pattern);
- }
- if (addressList != null && !addressList.isEmpty()) {
- ++addressIdx;
- if (addressIdx >= addressList.size()) {
- addressIdx = 0;
- }
- Mirror.Entry entry = addressList.get(addressIdx);
- return new RPCServiceAddress(entry.getName(), entry.getSpec());
- }
- }
- return null;
- }
+ RPCServiceAddress resolve();
- /**
- * Returns the pattern used when querying for the naming server for addresses. This is given at construtor time.
- *
- * @return The service pattern.
- */
- String getPattern() {
- return pattern;
- }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
index 0a6a58d4e89..1b7bcf01731 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
@@ -34,6 +34,13 @@ public class RPCServiceAddress implements ServiceAddress {
public RPCServiceAddress(String serviceName, String connectionSpec) {
this(serviceName, new Spec(connectionSpec));
}
+ public RPCServiceAddress(RPCServiceAddress blueprint) {
+ serviceName = blueprint.serviceName;
+ sessionName = blueprint.sessionName;
+ connectionSpec = blueprint.connectionSpec;
+ target = null;
+ }
+
@Override
public boolean equals(Object obj) {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
index abd33d6c9c2..e67688e1fca 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
@@ -36,7 +36,7 @@ public class RPCServicePool {
public RPCServiceAddress resolve(String pattern) {
RPCService service = services.get().get(pattern);
if (service == null) {
- service = new RPCService(net.getMirror(), pattern);
+ service = RPCService.create(net.getMirror(), pattern);
services.get().put(pattern, service);
}
return service.resolve();
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java
new file mode 100644
index 00000000000..e2fae59b429
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java
@@ -0,0 +1,23 @@
+package com.yahoo.messagebus.network.rpc;
+
+public class TcpRPCService implements RPCService {
+ private final RPCServiceAddress blueprint;
+
+ TcpRPCService(String pattern) {
+ if ( ! pattern.startsWith("tcp/")) {
+ throw new IllegalArgumentException("Expect tcp adress to start with 'tcp/', was: " + pattern);
+ }
+ RPCServiceAddress ret = null;
+ int pos = pattern.lastIndexOf('/');
+ if (pos > 0 && pos < pattern.length() - 1) {
+ ret = new RPCServiceAddress(pattern, pattern.substring(0, pos));
+ if ( ret.isMalformed()) {
+ ret = null;
+ }
+ }
+ blueprint = ret;
+ }
+ public RPCServiceAddress resolve() {
+ return blueprint != null ? new RPCServiceAddress(blueprint) : null;
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
index 1dbb30de585..0343b075579 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
@@ -83,11 +83,11 @@ public class ServiceAddressTestCase {
}
private void assertNullAddress(String pattern) {
- assertNull(new RPCService(network.getMirror(), pattern).resolve());
+ assertNull(RPCService.create(network.getMirror(), pattern).resolve());
}
private void assertAddress(String pattern, String expectedSpec, String expectedSession) {
- RPCService service = new RPCService(network.getMirror(), pattern);
+ RPCService service = RPCService.create(network.getMirror(), pattern);
RPCServiceAddress obj = service.resolve();
assertNotNull(obj);
assertNotNull(obj.getConnectionSpec());