aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-08-16 21:00:25 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-16 21:01:03 +0200
commit24094828e39ba844f8724ab9f9f7c3e9209c2122 (patch)
treee5683f8da8708d8d74bcaaf647cae722c5cea55c /container-messagebus
parentc5b3b3400d8cdd533008148cab04a1da0838bae1 (diff)
Use injectable network multiplexer holder in SessionCache to bridge generations
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java45
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java44
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java58
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java2
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java10
5 files changed, 120 insertions, 39 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
new file mode 100644
index 00000000000..9020bfaffe3
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
@@ -0,0 +1,45 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.shared.NullNetwork;
+
+/**
+ * Holds a reference to a singleton {@link NetworkMultiplexer}.
+ *
+ * @author jonmv
+ */
+public class NetworkMultiplexerHolder extends AbstractComponent {
+
+ private final Object monitor = new Object();
+ private boolean destroyed = false;
+ private NetworkMultiplexer net;
+
+ /** Get the singleton RPCNetworkAdapter, creating it if this hasn't yet been done. */
+ public NetworkMultiplexer get(RPCNetworkParams params) {
+ synchronized (monitor) {
+ if (destroyed)
+ throw new IllegalStateException("Component already destroyed");
+
+ return net = net != null ? net : NetworkMultiplexer.shared(newNetwork(params));
+ }
+ }
+
+ private Network newNetwork(RPCNetworkParams params) {
+ return params.getSlobroksConfig().slobrok().isEmpty() ? new NullNetwork() : new RPCNetwork(params);
+ }
+
+ @Override
+ public void deconstruct() {
+ synchronized (monitor) {
+ net.destroy();
+ net = null;
+ destroyed = true;
+ }
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
new file mode 100644
index 00000000000..650ac92e779
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
@@ -0,0 +1,44 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+
+/**
+ * Injectable component which provides an {@link NetworkMultiplexer}, creating one if needed,
+ * i.e., the first time this is created in a container--subsequent creations of this will reuse
+ * the underlying network that was created initially. This breaks the DI pattern, but must be done
+ * because the network is a unique resource which cannot exist in several versions simultaneously.
+ *
+ * @author jonmv
+ */
+public class NetworkMultiplexerProvider {
+
+ private final NetworkMultiplexer net;
+
+ @Inject
+ public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig) {
+ this(net, mbusConfig, slobroksConfig, System.getProperty("config.id")); //:
+ }
+
+ public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) {
+ this.net = net.get(asParameters(mbusConfig, slobroksConfig, identity));
+ }
+
+ public static RPCNetworkParams asParameters(ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) {
+ return new RPCNetworkParams().setSlobroksConfig(slobroksConfig)
+ .setIdentity(new Identity(identity))
+ .setListenPort(mbusConfig.port())
+ .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
+ .setNumNetworkThreads(mbusConfig.numthreads())
+ .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
+ .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
+ }
+
+ public NetworkMultiplexer net() { return net; }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 850cfc0b4bf..3e3902b35aa 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -18,14 +18,14 @@ import com.yahoo.jdisc.SharedResource;
import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
@@ -65,44 +65,42 @@ public final class SessionCache extends AbstractComponent {
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
@Inject
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
+ public SessionCache(NetworkMultiplexerProvider nets, ContainerMbusConfig containerMbusConfig,
+ DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
DistributionConfig distributionConfig) {
- this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig,
- messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //:
+ this(nets.net(), containerMbusConfig, documentmanagerConfig,
+ loadTypeConfig, messagebusConfig, policiesConfig, distributionConfig);
+
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
- DistributionConfig distributionConfig, String identity) {
- this(containerMbusConfig,
- slobroksConfig,
+ public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
+ this(net,
+ containerMbusConfig,
messagebusConfig,
- identity,
new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
new LoadTypeSet(loadTypeConfig),
policiesConfig,
distributionConfig));
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, String identity, Protocol protocol) {
- this.messageBus = createSharedMessageBus(containerMbusConfig,
- messagebusConfig,
- slobroksConfig,
- identity,
- protocol);
+ public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ MessagebusConfig messagebusConfig, Protocol protocol) {
+ this.messageBus = createSharedMessageBus(net, containerMbusConfig, messagebusConfig, protocol);
}
public void deconstruct() {
messageBus.release();
}
- private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
+ private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net,
+ ContainerMbusConfig mbusConfig,
MessagebusConfig messagebusConfig,
- SlobroksConfig slobroksConfig, String identity,
Protocol protocol) {
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
@@ -114,17 +112,9 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount());
mbusParams.setMaxPendingSize(maxPendingSize);
- RPCNetworkParams netParams = new RPCNetworkParams()
- .setSlobroksConfig(slobroksConfig)
- .setIdentity(new Identity(identity))
- .setListenPort(mbusConfig.port())
- .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
- .setNumNetworkThreads(mbusConfig.numthreads())
- .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
- .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
- SharedMessageBus bus = SharedMessageBus.newInstance(mbusParams, netParams);
- new ConfigAgent(messagebusConfig, bus.messageBus()); // Configure the wrapped MessageBus with a routing table.
- return bus;
+ MessageBus bus = new MessageBus(net, mbusParams);
+ new ConfigAgent(messagebusConfig, bus); // Configure the wrapped MessageBus with a routing table.
+ return new SharedMessageBus(bus);
}
private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
index bef3fdc1e34..a7cf5fec7d1 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
@@ -14,7 +14,7 @@ import java.util.List;
*
* @author Vegard Havdal
*/
-class NullNetwork implements Network {
+public class NullNetwork implements Network {
@Override
public boolean waitUntilReady(double seconds) {
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
index 9512c1a4873..d36234b0ccb 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
@@ -9,6 +9,9 @@ import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.messagebus.MessagebusConfig;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.shared.NullNetwork;
import com.yahoo.vespa.config.content.DistributionConfig;
import com.yahoo.vespa.config.content.LoadTypeConfig;
import org.junit.Test;
@@ -37,14 +40,13 @@ public class MbusClientProviderTest {
}
private void testClient(SessionConfig config) {
- SessionCache cache = new SessionCache(new ContainerMbusConfig.Builder().build(),
+ SessionCache cache = new SessionCache(NetworkMultiplexer.dedicated(new NullNetwork()),
+ new ContainerMbusConfig.Builder().build(),
new DocumentmanagerConfig.Builder().build(),
new LoadTypeConfig.Builder().build(),
- new SlobroksConfig.Builder().build(),
new MessagebusConfig.Builder().build(),
new DocumentProtocolPoliciesConfig.Builder().build(),
- new DistributionConfig.Builder().build(),
- "test");
+ new DistributionConfig.Builder().build());
MbusClientProvider p = new MbusClientProvider(cache, config);
assertNotNull(p.get());
p.deconstruct();