diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-08-27 12:29:35 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-08-27 12:29:35 +0200 |
commit | 259af31ddbc9cc470b94504a781aacc29ab22eef (patch) | |
tree | e3e91056e6d281a1e7ac8b5f4bb6a852f5a3a94e /container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus | |
parent | 420ce830ab41864a54e214098c4711340edd388c (diff) |
Lazily initialise session cache and its network
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus')
2 files changed, 36 insertions, 11 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java index 86e482adc48..5e7901b7fae 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java @@ -8,6 +8,8 @@ import com.yahoo.messagebus.network.Identity; import com.yahoo.messagebus.network.NetworkMultiplexer; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import java.util.function.Supplier; + /** * Injectable component which provides an {@link NetworkMultiplexer}, creating one if needed, * i.e., the first time this is created in a container--subsequent creations of this will reuse @@ -18,7 +20,9 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams; */ public class NetworkMultiplexerProvider { - private final NetworkMultiplexer net; + private final Object monitor = new Object(); + private final Supplier<NetworkMultiplexer> nets; + private NetworkMultiplexer net; @Inject public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig) { @@ -26,7 +30,7 @@ public class NetworkMultiplexerProvider { } public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, String identity) { - this.net = net.get(asParameters(mbusConfig, identity).setSlobrokConfigId(identity)); + this.nets = () -> net.get(asParameters(mbusConfig, identity).setSlobrokConfigId(identity)); } public static RPCNetworkParams asParameters(ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) { @@ -43,6 +47,10 @@ public class NetworkMultiplexerProvider { .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name())); } - public NetworkMultiplexer net() { return net; } + public NetworkMultiplexer net() { + synchronized (monitor) { + return net = net != null ? net : nets.get(); + } + } } diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 3e3902b35aa..66c041e425a 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -34,6 +34,7 @@ import com.yahoo.vespa.config.content.LoadTypeConfig; import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -55,7 +56,10 @@ public final class SessionCache extends AbstractComponent { private static final Logger log = Logger.getLogger(SessionCache.class.getName()); - private final SharedMessageBus messageBus; + private final Object monitor = new Object(); + private Supplier<SharedMessageBus> messageBuses; + private SharedMessageBus messageBus; + private final Object intermediateLock = new Object(); private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>(); private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator(); @@ -70,12 +74,12 @@ public final class SessionCache extends AbstractComponent { LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) { - this(nets.net(), containerMbusConfig, documentmanagerConfig, + this(nets::net, containerMbusConfig, documentmanagerConfig, loadTypeConfig, messagebusConfig, policiesConfig, distributionConfig); } - public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig, + public SessionCache(Supplier<NetworkMultiplexer> net, ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, @@ -89,13 +93,26 @@ public final class SessionCache extends AbstractComponent { distributionConfig)); } - public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig, + public SessionCache(Supplier<NetworkMultiplexer> net, ContainerMbusConfig containerMbusConfig, MessagebusConfig messagebusConfig, Protocol protocol) { - this.messageBus = createSharedMessageBus(net, containerMbusConfig, messagebusConfig, protocol); + this.messageBuses = () -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol); } + @Override public void deconstruct() { - messageBus.release(); + synchronized (monitor) { + messageBuses = () -> { throw new IllegalStateException("Session cache already deconstructed"); }; + + if (messageBus != null) + messageBus.release(); + } + } + + // Lazily create shared message bus. + private SharedMessageBus bus() { + synchronized (monitor) { + return messageBus = messageBus != null ? messageBus : messageBuses.get(); + } } private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net, @@ -185,7 +202,7 @@ public final class SessionCache extends AbstractComponent { @Override SharedSourceSession create(SourceSessionParams p) { log.log(Level.FINE, "Creating new source session."); - return messageBus.newSourceSession(p); + return bus().newSourceSession(p); } @Override @@ -205,7 +222,7 @@ public final class SessionCache extends AbstractComponent { @Override SharedIntermediateSession create(IntermediateSessionParams p) { log.log(Level.FINE, "Creating new intermediate session " + p.getName() + ""); - return messageBus.newIntermediateSession(p); + return bus().newIntermediateSession(p); } @Override |