summaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/main/java/com')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java35
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java48
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java60
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java7
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java35
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java23
6 files changed, 121 insertions, 87 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
index e75b5d0934a..4bd9bbf7f7f 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
@@ -7,10 +7,9 @@ import com.yahoo.text.Utf8Array;
import java.util.Deque;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
/**
@@ -33,15 +32,16 @@ public class NetworkMultiplexer implements NetworkOwner {
private final Network net;
private final Deque<NetworkOwner> owners = new ConcurrentLinkedDeque<>();
private final Map<String, Deque<NetworkOwner>> sessions = new ConcurrentHashMap<>();
- private final boolean shared;
+ private final AtomicBoolean disowned;
private NetworkMultiplexer(Network net, boolean shared) {
net.attach(this);
this.net = net;
- this.shared = shared;
+ this.disowned = new AtomicBoolean( ! shared);
}
- /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s. */
+ /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s,
+ * and will shut down when all these have detached, and {@link #disown()} has been called, in any order. */
public static NetworkMultiplexer shared(Network net) {
return new NetworkMultiplexer(net, true);
}
@@ -100,6 +100,7 @@ public class NetworkMultiplexer implements NetworkOwner {
owner.deliverMessage(message, session);
}
+ /** Attach the network owner to this, allowing this to forward messages to it. */
public void attach(NetworkOwner owner) {
if (owners.contains(owner))
throw new IllegalArgumentException(owner + " is already attached to this");
@@ -107,23 +108,27 @@ public class NetworkMultiplexer implements NetworkOwner {
owners.add(owner);
}
+ /** Detach the network owner from this, no longer allowing messages to it, and shutting down this is ownerless. */
public void detach(NetworkOwner owner) {
if ( ! owners.remove(owner))
throw new IllegalArgumentException(owner + " not attached to this");
- if ( ! shared && owners.isEmpty())
- net.shutdown();
+ destroyIfOwnerless();
}
- public void destroy() {
- if ( ! shared)
- throw new UnsupportedOperationException("Destroy called on a dedicated multiplexer; " +
- "this automatically shuts down when detached from");
+ /** Signal that external ownership of this is relinquished, allowing destruction on last owner detachment. */
+ public void disown() {
+ if (disowned.getAndSet(true))
+ throw new IllegalStateException("Destroy called on a dedicated multiplexer--" +
+ "this automatically shuts down when detached from--or " +
+ "called multiple times on a shared multiplexer");
- if ( ! owners.isEmpty())
- log.warning("NetworkMultiplexer destroyed before all owners detached: " + this);
+ destroyIfOwnerless();
+ }
- net.shutdown();
+ private void destroyIfOwnerless() {
+ if (disowned.get() && owners.isEmpty())
+ net.shutdown();
}
public Network net() {
@@ -136,7 +141,7 @@ public class NetworkMultiplexer implements NetworkOwner {
"net=" + net +
", owners=" + owners +
", sessions=" + sessions +
- ", shared=" + shared +
+ ", destructible=" + disowned +
'}';
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java
new file mode 100644
index 00000000000..59cafed1836
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java
@@ -0,0 +1,48 @@
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.jrt.slobrok.api.Mirror;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class NamedRPCService implements RPCService {
+ private final IMirror mirror;
+ private final String pattern;
+ private int addressIdx = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
+ private int addressGen = 0;
+ private List<Mirror.Entry> addressList = null;
+
+ /**
+ * Create a new RPCService backed by the given network and using the given service pattern.
+ *
+ * @param mirror The naming server to send queries to.
+ * @param pattern The pattern to use when querying.
+ */
+ public NamedRPCService(IMirror mirror, String pattern) {
+ this.mirror = mirror;
+ this.pattern = pattern;
+ }
+
+ /**
+ * Resolve a concrete address from this service. This service may represent multiple remote sessions, so this will
+ * select one that is online.
+ *
+ * @return A concrete service address.
+ */
+ public synchronized RPCServiceAddress resolve() {
+ if (addressGen != mirror.updates()) {
+ addressGen = mirror.updates();
+ addressList = mirror.lookup(pattern);
+ }
+ if (addressList != null && !addressList.isEmpty()) {
+ ++addressIdx;
+ if (addressIdx >= addressList.size()) {
+ addressIdx = 0;
+ }
+ Mirror.Entry entry = addressList.get(addressIdx);
+ return new RPCServiceAddress(entry.getName(), entry.getSpec());
+ }
+ return null;
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
index fb3c4cf9971..889df32ce1e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java
@@ -2,10 +2,6 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.jrt.slobrok.api.IMirror;
-import com.yahoo.jrt.slobrok.api.Mirror;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
/**
* An RPCService represents a set of remote sessions matching a service pattern. The sessions are monitored using the
@@ -13,23 +9,13 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author havardpe
*/
-public class RPCService {
-
- private final IMirror mirror;
- private final String pattern;
- private int addressIdx = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
- private int addressGen = 0;
- private List<Mirror.Entry> addressList = null;
+public interface RPCService {
- /**
- * Create a new RPCService backed by the given network and using the given service pattern.
- *
- * @param mirror The naming server to send queries to.
- * @param pattern The pattern to use when querying.
- */
- public RPCService(IMirror mirror, String pattern) {
- this.mirror = mirror;
- this.pattern = pattern;
+ static RPCService create(IMirror mirror, String pattern) {
+ if (pattern.startsWith("tcp/")) {
+ return new TcpRPCService(pattern);
+ }
+ return new NamedRPCService(mirror, pattern);
}
/**
@@ -38,38 +24,6 @@ public class RPCService {
*
* @return A concrete service address.
*/
- public RPCServiceAddress resolve() {
- if (pattern.startsWith("tcp/")) {
- int pos = pattern.lastIndexOf('/');
- if (pos > 0 && pos < pattern.length() - 1) {
- RPCServiceAddress ret = new RPCServiceAddress(pattern, pattern.substring(0, pos));
- if (!ret.isMalformed()) {
- return ret;
- }
- }
- } else {
- if (addressGen != mirror.updates()) {
- addressGen = mirror.updates();
- addressList = mirror.lookup(pattern);
- }
- if (addressList != null && !addressList.isEmpty()) {
- ++addressIdx;
- if (addressIdx >= addressList.size()) {
- addressIdx = 0;
- }
- Mirror.Entry entry = addressList.get(addressIdx);
- return new RPCServiceAddress(entry.getName(), entry.getSpec());
- }
- }
- return null;
- }
+ RPCServiceAddress resolve();
- /**
- * Returns the pattern used when querying for the naming server for addresses. This is given at construtor time.
- *
- * @return The service pattern.
- */
- String getPattern() {
- return pattern;
- }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
index 0a6a58d4e89..1b7bcf01731 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java
@@ -34,6 +34,13 @@ public class RPCServiceAddress implements ServiceAddress {
public RPCServiceAddress(String serviceName, String connectionSpec) {
this(serviceName, new Spec(connectionSpec));
}
+ public RPCServiceAddress(RPCServiceAddress blueprint) {
+ serviceName = blueprint.serviceName;
+ sessionName = blueprint.sessionName;
+ connectionSpec = blueprint.connectionSpec;
+ target = null;
+ }
+
@Override
public boolean equals(Object obj) {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
index abd33d6c9c2..a666a03c401 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.network.rpc;
+import com.yahoo.concurrent.CopyOnWriteHashMap;
+
import java.util.LinkedHashMap;
import java.util.Map;
@@ -12,7 +14,7 @@ import java.util.Map;
public class RPCServicePool {
private final RPCNetwork net;
- private final ThreadLocalCache services = new ThreadLocalCache();
+ private final Map<Long, ServiceLRUCache> mapOfServiceCache;
private final int maxSize;
/**
@@ -23,6 +25,7 @@ public class RPCServicePool {
*/
public RPCServicePool(RPCNetwork net, int maxSize) {
this.net = net;
+ mapOfServiceCache = new CopyOnWriteHashMap<>();
this.maxSize = maxSize;
}
@@ -34,12 +37,12 @@ public class RPCServicePool {
* @return A service address for the given pattern.
*/
public RPCServiceAddress resolve(String pattern) {
- RPCService service = services.get().get(pattern);
- if (service == null) {
- service = new RPCService(net.getMirror(), pattern);
- services.get().put(pattern, service);
- }
- return service.resolve();
+
+ return getPerThreadCache().computeIfAbsent(pattern, (key) -> RPCService.create(net.getMirror(), key)).resolve();
+ }
+
+ private ServiceLRUCache getPerThreadCache() {
+ return mapOfServiceCache.computeIfAbsent(Thread.currentThread().getId(), (key) -> new ServiceLRUCache(maxSize));
}
/**
@@ -49,7 +52,7 @@ public class RPCServicePool {
* @return The current size of this pool.
*/
public int getSize() {
- return services.get().size();
+ return getPerThreadCache().size();
}
/**
@@ -59,21 +62,15 @@ public class RPCServicePool {
* @return True if a corresponding service is in the pool.
*/
public boolean hasService(String pattern) {
- return services.get().containsKey(pattern);
- }
-
- private class ThreadLocalCache extends ThreadLocal<ServiceLRUCache> {
-
- @Override
- protected ServiceLRUCache initialValue() {
- return new ServiceLRUCache();
- }
+ return getPerThreadCache().containsKey(pattern);
}
- private class ServiceLRUCache extends LinkedHashMap<String, RPCService> {
+ private static class ServiceLRUCache extends LinkedHashMap<String, RPCService> {
+ private final int maxSize;
- ServiceLRUCache() {
+ ServiceLRUCache(int maxSize) {
super(16, 0.75f, true);
+ this.maxSize = maxSize;
}
@Override
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java
new file mode 100644
index 00000000000..e2fae59b429
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java
@@ -0,0 +1,23 @@
+package com.yahoo.messagebus.network.rpc;
+
+public class TcpRPCService implements RPCService {
+ private final RPCServiceAddress blueprint;
+
+ TcpRPCService(String pattern) {
+ if ( ! pattern.startsWith("tcp/")) {
+ throw new IllegalArgumentException("Expect tcp adress to start with 'tcp/', was: " + pattern);
+ }
+ RPCServiceAddress ret = null;
+ int pos = pattern.lastIndexOf('/');
+ if (pos > 0 && pos < pattern.length() - 1) {
+ ret = new RPCServiceAddress(pattern, pattern.substring(0, pos));
+ if ( ret.isMalformed()) {
+ ret = null;
+ }
+ }
+ blueprint = ret;
+ }
+ public RPCServiceAddress resolve() {
+ return blueprint != null ? new RPCServiceAddress(blueprint) : null;
+ }
+}