From 6058f5f8d2ed19ca3c0461e7080680a093834823 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 2 May 2022 21:08:17 +0200 Subject: Revert "Jonmv/remove last controller jersey client [run-systemtest]" --- .../jdisc/messagebus/NetworkMultiplexerHolder.java | 27 ++++++++++++++-------- .../container/jdisc/messagebus/SessionCache.java | 19 ++++++++++----- 2 files changed, 30 insertions(+), 16 deletions(-) (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus') diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java index 8b3b2664cd1..cb28807ac73 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java @@ -7,9 +7,6 @@ import com.yahoo.messagebus.network.NetworkMultiplexer; import com.yahoo.messagebus.network.rpc.RPCNetwork; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import com.yahoo.messagebus.shared.NullNetwork; -import com.yahoo.yolean.concurrent.Memoized; - -import java.util.concurrent.atomic.AtomicReference; /** * Holds a reference to a singleton {@link NetworkMultiplexer}. @@ -18,17 +15,21 @@ import java.util.concurrent.atomic.AtomicReference; */ public class NetworkMultiplexerHolder extends AbstractComponent { - private final AtomicReference params = new AtomicReference<>(); - private final Memoized net = new Memoized<>(() -> NetworkMultiplexer.shared(newNetwork(params.get())), - NetworkMultiplexer::disown); + private final Object monitor = new Object(); + private boolean destroyed = false; + private NetworkMultiplexer net; /** Get the singleton RPCNetworkAdapter, creating it if this hasn't yet been done. */ public NetworkMultiplexer get(RPCNetworkParams params) { - this.params.set(params); - return net.get(); + synchronized (monitor) { + if (destroyed) + throw new IllegalStateException("Component already destroyed"); + + return net = net != null ? net : NetworkMultiplexer.shared(newNetwork(params)); + } } - private static Network newNetwork(RPCNetworkParams params) { + private Network newNetwork(RPCNetworkParams params) { return params.getSlobroksConfig() != null && params.getSlobroksConfig().slobrok().isEmpty() ? new NullNetwork() // For LocalApplication, test setup. : new RPCNetwork(params); @@ -36,7 +37,13 @@ public class NetworkMultiplexerHolder extends AbstractComponent { @Override public void deconstruct() { - net.close(); + synchronized (monitor) { + if (net != null) { + net.disown(); + net = null; + } + destroyed = true; + } } } diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 3045247a6d3..91a3181cb68 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -29,7 +29,6 @@ import com.yahoo.messagebus.shared.SharedMessageBus; import com.yahoo.messagebus.shared.SharedSourceSession; import com.yahoo.vespa.config.content.DistributionConfig; import com.yahoo.vespa.config.content.LoadTypeConfig; -import com.yahoo.yolean.concurrent.Memoized; import java.util.HashMap; import java.util.Map; @@ -55,7 +54,9 @@ public final class SessionCache extends AbstractComponent { private static final Logger log = Logger.getLogger(SessionCache.class.getName()); - private final Memoized messageBus; + private final Object monitor = new Object(); + private Supplier messageBuses; + private SharedMessageBus messageBus; private final Object intermediateLock = new Object(); private final Map intermediates = new HashMap<>(); @@ -95,18 +96,24 @@ public final class SessionCache extends AbstractComponent { public SessionCache(Supplier net, ContainerMbusConfig containerMbusConfig, MessagebusConfig messagebusConfig, Protocol protocol) { - this.messageBus = new Memoized<>(() -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol), - SharedMessageBus::release); + this.messageBuses = () -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol); } @Override public void deconstruct() { - messageBus.close(); + synchronized (monitor) { + messageBuses = () -> { throw new IllegalStateException("Session cache already deconstructed"); }; + + if (messageBus != null) + messageBus.release(); + } } // Lazily create shared message bus. private SharedMessageBus bus() { - return messageBus.get(); + synchronized (monitor) { + return messageBus = messageBus != null ? messageBus : messageBuses.get(); + } } private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net, -- cgit v1.2.3