diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-08-16 21:00:25 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-08-16 21:01:03 +0200 |
commit | 24094828e39ba844f8724ab9f9f7c3e9209c2122 (patch) | |
tree | e5683f8da8708d8d74bcaaf647cae722c5cea55c /container-messagebus | |
parent | c5b3b3400d8cdd533008148cab04a1da0838bae1 (diff) |
Use injectable network multiplexer holder in SessionCache to bridge generations
Diffstat (limited to 'container-messagebus')
5 files changed, 120 insertions, 39 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java new file mode 100644 index 00000000000..9020bfaffe3 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java @@ -0,0 +1,45 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc.messagebus; + +import com.yahoo.component.AbstractComponent; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.NetworkMultiplexer; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.shared.NullNetwork; + +/** + * Holds a reference to a singleton {@link NetworkMultiplexer}. + * + * @author jonmv + */ +public class NetworkMultiplexerHolder extends AbstractComponent { + + private final Object monitor = new Object(); + private boolean destroyed = false; + private NetworkMultiplexer net; + + /** Get the singleton RPCNetworkAdapter, creating it if this hasn't yet been done. */ + public NetworkMultiplexer get(RPCNetworkParams params) { + synchronized (monitor) { + if (destroyed) + throw new IllegalStateException("Component already destroyed"); + + return net = net != null ? net : NetworkMultiplexer.shared(newNetwork(params)); + } + } + + private Network newNetwork(RPCNetworkParams params) { + return params.getSlobroksConfig().slobrok().isEmpty() ? new NullNetwork() : new RPCNetwork(params); + } + + @Override + public void deconstruct() { + synchronized (monitor) { + net.destroy(); + net = null; + destroyed = true; + } + } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java new file mode 100644 index 00000000000..650ac92e779 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java @@ -0,0 +1,44 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc.messagebus; + +import com.google.inject.Inject; +import com.yahoo.cloud.config.SlobroksConfig; +import com.yahoo.container.jdisc.ContainerMbusConfig; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.NetworkMultiplexer; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; + +/** + * Injectable component which provides an {@link NetworkMultiplexer}, creating one if needed, + * i.e., the first time this is created in a container--subsequent creations of this will reuse + * the underlying network that was created initially. This breaks the DI pattern, but must be done + * because the network is a unique resource which cannot exist in several versions simultaneously. + * + * @author jonmv + */ +public class NetworkMultiplexerProvider { + + private final NetworkMultiplexer net; + + @Inject + public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig) { + this(net, mbusConfig, slobroksConfig, System.getProperty("config.id")); //: + } + + public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) { + this.net = net.get(asParameters(mbusConfig, slobroksConfig, identity)); + } + + public static RPCNetworkParams asParameters(ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) { + return new RPCNetworkParams().setSlobroksConfig(slobroksConfig) + .setIdentity(new Identity(identity)) + .setListenPort(mbusConfig.port()) + .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget()) + .setNumNetworkThreads(mbusConfig.numthreads()) + .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup()) + .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name())); + } + + public NetworkMultiplexer net() { return net; } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 850cfc0b4bf..3e3902b35aa 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -18,14 +18,14 @@ import com.yahoo.jdisc.SharedResource; import com.yahoo.messagebus.ConfigAgent; import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.MessageBus; import com.yahoo.messagebus.MessageBusParams; import com.yahoo.messagebus.MessagebusConfig; import com.yahoo.messagebus.Protocol; import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; -import com.yahoo.messagebus.network.Identity; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.NetworkMultiplexer; import com.yahoo.messagebus.shared.SharedIntermediateSession; import com.yahoo.messagebus.shared.SharedMessageBus; import com.yahoo.messagebus.shared.SharedSourceSession; @@ -65,44 +65,42 @@ public final class SessionCache extends AbstractComponent { private final SourceSessionCreator sourcesCreator = new SourceSessionCreator(); @Inject - public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, - LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, + public SessionCache(NetworkMultiplexerProvider nets, ContainerMbusConfig containerMbusConfig, + DocumentmanagerConfig documentmanagerConfig, + LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig, + DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) { - this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig, - messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //: + this(nets.net(), containerMbusConfig, documentmanagerConfig, + loadTypeConfig, messagebusConfig, policiesConfig, distributionConfig); + } - public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, - LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, - DistributionConfig distributionConfig, String identity) { - this(containerMbusConfig, - slobroksConfig, + public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig, + DocumentmanagerConfig documentmanagerConfig, + LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig, + DocumentProtocolPoliciesConfig policiesConfig, + DistributionConfig distributionConfig) { + this(net, + containerMbusConfig, messagebusConfig, - identity, new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig), new LoadTypeSet(loadTypeConfig), policiesConfig, distributionConfig)); } - public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, String identity, Protocol protocol) { - this.messageBus = createSharedMessageBus(containerMbusConfig, - messagebusConfig, - slobroksConfig, - identity, - protocol); + public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig, + MessagebusConfig messagebusConfig, Protocol protocol) { + this.messageBus = createSharedMessageBus(net, containerMbusConfig, messagebusConfig, protocol); } public void deconstruct() { messageBus.release(); } - private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig, + private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net, + ContainerMbusConfig mbusConfig, MessagebusConfig messagebusConfig, - SlobroksConfig slobroksConfig, String identity, Protocol protocol) { MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); @@ -114,17 +112,9 @@ public final class SessionCache extends AbstractComponent { mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount()); mbusParams.setMaxPendingSize(maxPendingSize); - RPCNetworkParams netParams = new RPCNetworkParams() - .setSlobroksConfig(slobroksConfig) - .setIdentity(new Identity(identity)) - .setListenPort(mbusConfig.port()) - .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget()) - .setNumNetworkThreads(mbusConfig.numthreads()) - .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup()) - .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name())); - SharedMessageBus bus = SharedMessageBus.newInstance(mbusParams, netParams); - new ConfigAgent(messagebusConfig, bus.messageBus()); // Configure the wrapped MessageBus with a routing table. - return bus; + MessageBus bus = new MessageBus(net, mbusParams); + new ConfigAgent(messagebusConfig, bus); // Configure the wrapped MessageBus with a routing table. + return new SharedMessageBus(bus); } private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) { diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java index bef3fdc1e34..a7cf5fec7d1 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java @@ -14,7 +14,7 @@ import java.util.List; * * @author Vegard Havdal */ -class NullNetwork implements Network { +public class NullNetwork implements Network { @Override public boolean waitUntilReady(double seconds) { diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java index 9512c1a4873..d36234b0ccb 100644 --- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java +++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java @@ -9,6 +9,9 @@ import com.yahoo.container.jdisc.messagebus.SessionCache; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig; import com.yahoo.messagebus.MessagebusConfig; +import com.yahoo.messagebus.network.NetworkMultiplexer; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.shared.NullNetwork; import com.yahoo.vespa.config.content.DistributionConfig; import com.yahoo.vespa.config.content.LoadTypeConfig; import org.junit.Test; @@ -37,14 +40,13 @@ public class MbusClientProviderTest { } private void testClient(SessionConfig config) { - SessionCache cache = new SessionCache(new ContainerMbusConfig.Builder().build(), + SessionCache cache = new SessionCache(NetworkMultiplexer.dedicated(new NullNetwork()), + new ContainerMbusConfig.Builder().build(), new DocumentmanagerConfig.Builder().build(), new LoadTypeConfig.Builder().build(), - new SlobroksConfig.Builder().build(), new MessagebusConfig.Builder().build(), new DocumentProtocolPoliciesConfig.Builder().build(), - new DistributionConfig.Builder().build(), - "test"); + new DistributionConfig.Builder().build()); MbusClientProvider p = new MbusClientProvider(cache, config); assertNotNull(p.get()); p.deconstruct(); |