From 0ad6f7f4892816c84542bf0dbe6fbc7c61febd2d Mon Sep 17 00:00:00 2001 From: jonmv Date: Fri, 29 Apr 2022 20:18:30 +0200 Subject: Simplify shared resource setup with Memoized --- .../jdisc/messagebus/NetworkMultiplexerHolder.java | 27 ++++++++-------------- .../container/jdisc/messagebus/SessionCache.java | 18 +++++---------- 2 files changed, 16 insertions(+), 29 deletions(-) (limited to 'container-messagebus/src') diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java index cb28807ac73..8b3b2664cd1 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java @@ -7,6 +7,9 @@ import com.yahoo.messagebus.network.NetworkMultiplexer; import com.yahoo.messagebus.network.rpc.RPCNetwork; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import com.yahoo.messagebus.shared.NullNetwork; +import com.yahoo.yolean.concurrent.Memoized; + +import java.util.concurrent.atomic.AtomicReference; /** * Holds a reference to a singleton {@link NetworkMultiplexer}. @@ -15,21 +18,17 @@ import com.yahoo.messagebus.shared.NullNetwork; */ public class NetworkMultiplexerHolder extends AbstractComponent { - private final Object monitor = new Object(); - private boolean destroyed = false; - private NetworkMultiplexer net; + private final AtomicReference params = new AtomicReference<>(); + private final Memoized net = new Memoized<>(() -> NetworkMultiplexer.shared(newNetwork(params.get())), + NetworkMultiplexer::disown); /** Get the singleton RPCNetworkAdapter, creating it if this hasn't yet been done. */ public NetworkMultiplexer get(RPCNetworkParams params) { - synchronized (monitor) { - if (destroyed) - throw new IllegalStateException("Component already destroyed"); - - return net = net != null ? net : NetworkMultiplexer.shared(newNetwork(params)); - } + this.params.set(params); + return net.get(); } - private Network newNetwork(RPCNetworkParams params) { + private static Network newNetwork(RPCNetworkParams params) { return params.getSlobroksConfig() != null && params.getSlobroksConfig().slobrok().isEmpty() ? new NullNetwork() // For LocalApplication, test setup. : new RPCNetwork(params); @@ -37,13 +36,7 @@ public class NetworkMultiplexerHolder extends AbstractComponent { @Override public void deconstruct() { - synchronized (monitor) { - if (net != null) { - net.disown(); - net = null; - } - destroyed = true; - } + net.close(); } } diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 91a3181cb68..612eb8bb8e0 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -29,6 +29,7 @@ import com.yahoo.messagebus.shared.SharedMessageBus; import com.yahoo.messagebus.shared.SharedSourceSession; import com.yahoo.vespa.config.content.DistributionConfig; import com.yahoo.vespa.config.content.LoadTypeConfig; +import com.yahoo.yolean.concurrent.Memoized; import java.util.HashMap; import java.util.Map; @@ -55,8 +56,7 @@ public final class SessionCache extends AbstractComponent { private static final Logger log = Logger.getLogger(SessionCache.class.getName()); private final Object monitor = new Object(); - private Supplier messageBuses; - private SharedMessageBus messageBus; + private Memoized messageBus; private final Object intermediateLock = new Object(); private final Map intermediates = new HashMap<>(); @@ -96,24 +96,18 @@ public final class SessionCache extends AbstractComponent { public SessionCache(Supplier net, ContainerMbusConfig containerMbusConfig, MessagebusConfig messagebusConfig, Protocol protocol) { - this.messageBuses = () -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol); + this.messageBus = new Memoized<>(() -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol), + SharedMessageBus::release); } @Override public void deconstruct() { - synchronized (monitor) { - messageBuses = () -> { throw new IllegalStateException("Session cache already deconstructed"); }; - - if (messageBus != null) - messageBus.release(); - } + messageBus.close(); } // Lazily create shared message bus. private SharedMessageBus bus() { - synchronized (monitor) { - return messageBus = messageBus != null ? messageBus : messageBuses.get(); - } + return messageBus.get(); } private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net, -- cgit v1.2.3