diff options
Diffstat (limited to 'messagebus/src/main/java/com')
6 files changed, 121 insertions, 87 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java index e75b5d0934a..4bd9bbf7f7f 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java @@ -7,10 +7,9 @@ import com.yahoo.text.Utf8Array; import java.util.Deque; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; /** @@ -33,15 +32,16 @@ public class NetworkMultiplexer implements NetworkOwner { private final Network net; private final Deque<NetworkOwner> owners = new ConcurrentLinkedDeque<>(); private final Map<String, Deque<NetworkOwner>> sessions = new ConcurrentHashMap<>(); - private final boolean shared; + private final AtomicBoolean disowned; private NetworkMultiplexer(Network net, boolean shared) { net.attach(this); this.net = net; - this.shared = shared; + this.disowned = new AtomicBoolean( ! shared); } - /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s. */ + /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s, + * and will shut down when all these have detached, and {@link #disown()} has been called, in any order. */ public static NetworkMultiplexer shared(Network net) { return new NetworkMultiplexer(net, true); } @@ -100,6 +100,7 @@ public class NetworkMultiplexer implements NetworkOwner { owner.deliverMessage(message, session); } + /** Attach the network owner to this, allowing this to forward messages to it. */ public void attach(NetworkOwner owner) { if (owners.contains(owner)) throw new IllegalArgumentException(owner + " is already attached to this"); @@ -107,23 +108,27 @@ public class NetworkMultiplexer implements NetworkOwner { owners.add(owner); } + /** Detach the network owner from this, no longer allowing messages to it, and shutting down this is ownerless. */ public void detach(NetworkOwner owner) { if ( ! owners.remove(owner)) throw new IllegalArgumentException(owner + " not attached to this"); - if ( ! shared && owners.isEmpty()) - net.shutdown(); + destroyIfOwnerless(); } - public void destroy() { - if ( ! shared) - throw new UnsupportedOperationException("Destroy called on a dedicated multiplexer; " + - "this automatically shuts down when detached from"); + /** Signal that external ownership of this is relinquished, allowing destruction on last owner detachment. */ + public void disown() { + if (disowned.getAndSet(true)) + throw new IllegalStateException("Destroy called on a dedicated multiplexer--" + + "this automatically shuts down when detached from--or " + + "called multiple times on a shared multiplexer"); - if ( ! owners.isEmpty()) - log.warning("NetworkMultiplexer destroyed before all owners detached: " + this); + destroyIfOwnerless(); + } - net.shutdown(); + private void destroyIfOwnerless() { + if (disowned.get() && owners.isEmpty()) + net.shutdown(); } public Network net() { @@ -136,7 +141,7 @@ public class NetworkMultiplexer implements NetworkOwner { "net=" + net + ", owners=" + owners + ", sessions=" + sessions + - ", shared=" + shared + + ", destructible=" + disowned + '}'; } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java new file mode 100644 index 00000000000..59cafed1836 --- /dev/null +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/NamedRPCService.java @@ -0,0 +1,48 @@ +package com.yahoo.messagebus.network.rpc; + +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.jrt.slobrok.api.Mirror; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +public class NamedRPCService implements RPCService { + private final IMirror mirror; + private final String pattern; + private int addressIdx = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); + private int addressGen = 0; + private List<Mirror.Entry> addressList = null; + + /** + * Create a new RPCService backed by the given network and using the given service pattern. + * + * @param mirror The naming server to send queries to. + * @param pattern The pattern to use when querying. + */ + public NamedRPCService(IMirror mirror, String pattern) { + this.mirror = mirror; + this.pattern = pattern; + } + + /** + * Resolve a concrete address from this service. This service may represent multiple remote sessions, so this will + * select one that is online. + * + * @return A concrete service address. + */ + public synchronized RPCServiceAddress resolve() { + if (addressGen != mirror.updates()) { + addressGen = mirror.updates(); + addressList = mirror.lookup(pattern); + } + if (addressList != null && !addressList.isEmpty()) { + ++addressIdx; + if (addressIdx >= addressList.size()) { + addressIdx = 0; + } + Mirror.Entry entry = addressList.get(addressIdx); + return new RPCServiceAddress(entry.getName(), entry.getSpec()); + } + return null; + } +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java index fb3c4cf9971..889df32ce1e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCService.java @@ -2,10 +2,6 @@ package com.yahoo.messagebus.network.rpc; import com.yahoo.jrt.slobrok.api.IMirror; -import com.yahoo.jrt.slobrok.api.Mirror; - -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; /** * An RPCService represents a set of remote sessions matching a service pattern. The sessions are monitored using the @@ -13,23 +9,13 @@ import java.util.concurrent.ThreadLocalRandom; * * @author havardpe */ -public class RPCService { - - private final IMirror mirror; - private final String pattern; - private int addressIdx = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); - private int addressGen = 0; - private List<Mirror.Entry> addressList = null; +public interface RPCService { - /** - * Create a new RPCService backed by the given network and using the given service pattern. - * - * @param mirror The naming server to send queries to. - * @param pattern The pattern to use when querying. - */ - public RPCService(IMirror mirror, String pattern) { - this.mirror = mirror; - this.pattern = pattern; + static RPCService create(IMirror mirror, String pattern) { + if (pattern.startsWith("tcp/")) { + return new TcpRPCService(pattern); + } + return new NamedRPCService(mirror, pattern); } /** @@ -38,38 +24,6 @@ public class RPCService { * * @return A concrete service address. */ - public RPCServiceAddress resolve() { - if (pattern.startsWith("tcp/")) { - int pos = pattern.lastIndexOf('/'); - if (pos > 0 && pos < pattern.length() - 1) { - RPCServiceAddress ret = new RPCServiceAddress(pattern, pattern.substring(0, pos)); - if (!ret.isMalformed()) { - return ret; - } - } - } else { - if (addressGen != mirror.updates()) { - addressGen = mirror.updates(); - addressList = mirror.lookup(pattern); - } - if (addressList != null && !addressList.isEmpty()) { - ++addressIdx; - if (addressIdx >= addressList.size()) { - addressIdx = 0; - } - Mirror.Entry entry = addressList.get(addressIdx); - return new RPCServiceAddress(entry.getName(), entry.getSpec()); - } - } - return null; - } + RPCServiceAddress resolve(); - /** - * Returns the pattern used when querying for the naming server for addresses. This is given at construtor time. - * - * @return The service pattern. - */ - String getPattern() { - return pattern; - } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java index 0a6a58d4e89..1b7bcf01731 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServiceAddress.java @@ -34,6 +34,13 @@ public class RPCServiceAddress implements ServiceAddress { public RPCServiceAddress(String serviceName, String connectionSpec) { this(serviceName, new Spec(connectionSpec)); } + public RPCServiceAddress(RPCServiceAddress blueprint) { + serviceName = blueprint.serviceName; + sessionName = blueprint.sessionName; + connectionSpec = blueprint.connectionSpec; + target = null; + } + @Override public boolean equals(Object obj) { diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java index abd33d6c9c2..a666a03c401 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCServicePool.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus.network.rpc; +import com.yahoo.concurrent.CopyOnWriteHashMap; + import java.util.LinkedHashMap; import java.util.Map; @@ -12,7 +14,7 @@ import java.util.Map; public class RPCServicePool { private final RPCNetwork net; - private final ThreadLocalCache services = new ThreadLocalCache(); + private final Map<Long, ServiceLRUCache> mapOfServiceCache; private final int maxSize; /** @@ -23,6 +25,7 @@ public class RPCServicePool { */ public RPCServicePool(RPCNetwork net, int maxSize) { this.net = net; + mapOfServiceCache = new CopyOnWriteHashMap<>(); this.maxSize = maxSize; } @@ -34,12 +37,12 @@ public class RPCServicePool { * @return A service address for the given pattern. */ public RPCServiceAddress resolve(String pattern) { - RPCService service = services.get().get(pattern); - if (service == null) { - service = new RPCService(net.getMirror(), pattern); - services.get().put(pattern, service); - } - return service.resolve(); + + return getPerThreadCache().computeIfAbsent(pattern, (key) -> RPCService.create(net.getMirror(), key)).resolve(); + } + + private ServiceLRUCache getPerThreadCache() { + return mapOfServiceCache.computeIfAbsent(Thread.currentThread().getId(), (key) -> new ServiceLRUCache(maxSize)); } /** @@ -49,7 +52,7 @@ public class RPCServicePool { * @return The current size of this pool. */ public int getSize() { - return services.get().size(); + return getPerThreadCache().size(); } /** @@ -59,21 +62,15 @@ public class RPCServicePool { * @return True if a corresponding service is in the pool. */ public boolean hasService(String pattern) { - return services.get().containsKey(pattern); - } - - private class ThreadLocalCache extends ThreadLocal<ServiceLRUCache> { - - @Override - protected ServiceLRUCache initialValue() { - return new ServiceLRUCache(); - } + return getPerThreadCache().containsKey(pattern); } - private class ServiceLRUCache extends LinkedHashMap<String, RPCService> { + private static class ServiceLRUCache extends LinkedHashMap<String, RPCService> { + private final int maxSize; - ServiceLRUCache() { + ServiceLRUCache(int maxSize) { super(16, 0.75f, true); + this.maxSize = maxSize; } @Override diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java new file mode 100644 index 00000000000..e2fae59b429 --- /dev/null +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/TcpRPCService.java @@ -0,0 +1,23 @@ +package com.yahoo.messagebus.network.rpc; + +public class TcpRPCService implements RPCService { + private final RPCServiceAddress blueprint; + + TcpRPCService(String pattern) { + if ( ! pattern.startsWith("tcp/")) { + throw new IllegalArgumentException("Expect tcp adress to start with 'tcp/', was: " + pattern); + } + RPCServiceAddress ret = null; + int pos = pattern.lastIndexOf('/'); + if (pos > 0 && pos < pattern.length() - 1) { + ret = new RPCServiceAddress(pattern, pattern.substring(0, pos)); + if ( ret.isMalformed()) { + ret = null; + } + } + blueprint = ret; + } + public RPCServiceAddress resolve() { + return blueprint != null ? new RPCServiceAddress(blueprint) : null; + } +} |