summaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-08-27 12:29:35 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-27 12:29:35 +0200
commit259af31ddbc9cc470b94504a781aacc29ab22eef (patch)
treee3e91056e6d281a1e7ac8b5f4bb6a852f5a3a94e /container-messagebus
parent420ce830ab41864a54e214098c4711340edd388c (diff)
Lazily initialise session cache and its network
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java14
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java33
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java2
3 files changed, 37 insertions, 12 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
index 86e482adc48..5e7901b7fae 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
@@ -8,6 +8,8 @@ import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import java.util.function.Supplier;
+
/**
* Injectable component which provides an {@link NetworkMultiplexer}, creating one if needed,
* i.e., the first time this is created in a container--subsequent creations of this will reuse
@@ -18,7 +20,9 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
*/
public class NetworkMultiplexerProvider {
- private final NetworkMultiplexer net;
+ private final Object monitor = new Object();
+ private final Supplier<NetworkMultiplexer> nets;
+ private NetworkMultiplexer net;
@Inject
public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig) {
@@ -26,7 +30,7 @@ public class NetworkMultiplexerProvider {
}
public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, String identity) {
- this.net = net.get(asParameters(mbusConfig, identity).setSlobrokConfigId(identity));
+ this.nets = () -> net.get(asParameters(mbusConfig, identity).setSlobrokConfigId(identity));
}
public static RPCNetworkParams asParameters(ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) {
@@ -43,6 +47,10 @@ public class NetworkMultiplexerProvider {
.setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
}
- public NetworkMultiplexer net() { return net; }
+ public NetworkMultiplexer net() {
+ synchronized (monitor) {
+ return net = net != null ? net : nets.get();
+ }
+ }
}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 3e3902b35aa..66c041e425a 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -34,6 +34,7 @@ import com.yahoo.vespa.config.content.LoadTypeConfig;
import java.util.HashMap;
import java.util.Map;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -55,7 +56,10 @@ public final class SessionCache extends AbstractComponent {
private static final Logger log = Logger.getLogger(SessionCache.class.getName());
- private final SharedMessageBus messageBus;
+ private final Object monitor = new Object();
+ private Supplier<SharedMessageBus> messageBuses;
+ private SharedMessageBus messageBus;
+
private final Object intermediateLock = new Object();
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
@@ -70,12 +74,12 @@ public final class SessionCache extends AbstractComponent {
LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
DocumentProtocolPoliciesConfig policiesConfig,
DistributionConfig distributionConfig) {
- this(nets.net(), containerMbusConfig, documentmanagerConfig,
+ this(nets::net, containerMbusConfig, documentmanagerConfig,
loadTypeConfig, messagebusConfig, policiesConfig, distributionConfig);
}
- public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ public SessionCache(Supplier<NetworkMultiplexer> net, ContainerMbusConfig containerMbusConfig,
DocumentmanagerConfig documentmanagerConfig,
LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
DocumentProtocolPoliciesConfig policiesConfig,
@@ -89,13 +93,26 @@ public final class SessionCache extends AbstractComponent {
distributionConfig));
}
- public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ public SessionCache(Supplier<NetworkMultiplexer> net, ContainerMbusConfig containerMbusConfig,
MessagebusConfig messagebusConfig, Protocol protocol) {
- this.messageBus = createSharedMessageBus(net, containerMbusConfig, messagebusConfig, protocol);
+ this.messageBuses = () -> createSharedMessageBus(net.get(), containerMbusConfig, messagebusConfig, protocol);
}
+ @Override
public void deconstruct() {
- messageBus.release();
+ synchronized (monitor) {
+ messageBuses = () -> { throw new IllegalStateException("Session cache already deconstructed"); };
+
+ if (messageBus != null)
+ messageBus.release();
+ }
+ }
+
+ // Lazily create shared message bus.
+ private SharedMessageBus bus() {
+ synchronized (monitor) {
+ return messageBus = messageBus != null ? messageBus : messageBuses.get();
+ }
}
private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net,
@@ -185,7 +202,7 @@ public final class SessionCache extends AbstractComponent {
@Override
SharedSourceSession create(SourceSessionParams p) {
log.log(Level.FINE, "Creating new source session.");
- return messageBus.newSourceSession(p);
+ return bus().newSourceSession(p);
}
@Override
@@ -205,7 +222,7 @@ public final class SessionCache extends AbstractComponent {
@Override
SharedIntermediateSession create(IntermediateSessionParams p) {
log.log(Level.FINE, "Creating new intermediate session " + p.getName() + "");
- return messageBus.newIntermediateSession(p);
+ return bus().newIntermediateSession(p);
}
@Override
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
index 2a1c4e9a15a..abee9293830 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
@@ -36,7 +36,7 @@ public class MbusClientProviderTest {
}
private void testClient(SessionConfig config) {
- SessionCache cache = new SessionCache(NetworkMultiplexer.dedicated(new NullNetwork()),
+ SessionCache cache = new SessionCache(() -> NetworkMultiplexer.dedicated(new NullNetwork()),
new ContainerMbusConfig.Builder().build(),
new DocumentmanagerConfig.Builder().build(),
new LoadTypeConfig.Builder().build(),