aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java2
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java45
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java44
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java58
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java2
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java10
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java12
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java11
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java5
9 files changed, 137 insertions, 52 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
index 21f1f27d5af..792fa3f1884 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
@@ -32,6 +32,8 @@ public final class ApplicationContainer extends Container implements
super(parent, name, retired, index, deployState);
this.isHostedVespa = deployState.isHosted();
+ addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.NetworkMultiplexerHolder"));
+ addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider"));
addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.SessionCache"));
}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
new file mode 100644
index 00000000000..9020bfaffe3
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
@@ -0,0 +1,45 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.shared.NullNetwork;
+
+/**
+ * Holds a reference to a singleton {@link NetworkMultiplexer}.
+ *
+ * @author jonmv
+ */
+public class NetworkMultiplexerHolder extends AbstractComponent {
+
+ private final Object monitor = new Object();
+ private boolean destroyed = false;
+ private NetworkMultiplexer net;
+
+ /** Get the singleton RPCNetworkAdapter, creating it if this hasn't yet been done. */
+ public NetworkMultiplexer get(RPCNetworkParams params) {
+ synchronized (monitor) {
+ if (destroyed)
+ throw new IllegalStateException("Component already destroyed");
+
+ return net = net != null ? net : NetworkMultiplexer.shared(newNetwork(params));
+ }
+ }
+
+ private Network newNetwork(RPCNetworkParams params) {
+ return params.getSlobroksConfig().slobrok().isEmpty() ? new NullNetwork() : new RPCNetwork(params);
+ }
+
+ @Override
+ public void deconstruct() {
+ synchronized (monitor) {
+ net.destroy();
+ net = null;
+ destroyed = true;
+ }
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
new file mode 100644
index 00000000000..650ac92e779
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
@@ -0,0 +1,44 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+
+/**
+ * Injectable component which provides an {@link NetworkMultiplexer}, creating one if needed,
+ * i.e., the first time this is created in a container--subsequent creations of this will reuse
+ * the underlying network that was created initially. This breaks the DI pattern, but must be done
+ * because the network is a unique resource which cannot exist in several versions simultaneously.
+ *
+ * @author jonmv
+ */
+public class NetworkMultiplexerProvider {
+
+ private final NetworkMultiplexer net;
+
+ @Inject
+ public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig) {
+ this(net, mbusConfig, slobroksConfig, System.getProperty("config.id")); //:
+ }
+
+ public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) {
+ this.net = net.get(asParameters(mbusConfig, slobroksConfig, identity));
+ }
+
+ public static RPCNetworkParams asParameters(ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) {
+ return new RPCNetworkParams().setSlobroksConfig(slobroksConfig)
+ .setIdentity(new Identity(identity))
+ .setListenPort(mbusConfig.port())
+ .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
+ .setNumNetworkThreads(mbusConfig.numthreads())
+ .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
+ .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
+ }
+
+ public NetworkMultiplexer net() { return net; }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 850cfc0b4bf..3e3902b35aa 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -18,14 +18,14 @@ import com.yahoo.jdisc.SharedResource;
import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
@@ -65,44 +65,42 @@ public final class SessionCache extends AbstractComponent {
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
@Inject
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
+ public SessionCache(NetworkMultiplexerProvider nets, ContainerMbusConfig containerMbusConfig,
+ DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
DistributionConfig distributionConfig) {
- this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig,
- messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //:
+ this(nets.net(), containerMbusConfig, documentmanagerConfig,
+ loadTypeConfig, messagebusConfig, policiesConfig, distributionConfig);
+
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
- DistributionConfig distributionConfig, String identity) {
- this(containerMbusConfig,
- slobroksConfig,
+ public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
+ this(net,
+ containerMbusConfig,
messagebusConfig,
- identity,
new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
new LoadTypeSet(loadTypeConfig),
policiesConfig,
distributionConfig));
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, String identity, Protocol protocol) {
- this.messageBus = createSharedMessageBus(containerMbusConfig,
- messagebusConfig,
- slobroksConfig,
- identity,
- protocol);
+ public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ MessagebusConfig messagebusConfig, Protocol protocol) {
+ this.messageBus = createSharedMessageBus(net, containerMbusConfig, messagebusConfig, protocol);
}
public void deconstruct() {
messageBus.release();
}
- private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
+ private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net,
+ ContainerMbusConfig mbusConfig,
MessagebusConfig messagebusConfig,
- SlobroksConfig slobroksConfig, String identity,
Protocol protocol) {
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
@@ -114,17 +112,9 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount());
mbusParams.setMaxPendingSize(maxPendingSize);
- RPCNetworkParams netParams = new RPCNetworkParams()
- .setSlobroksConfig(slobroksConfig)
- .setIdentity(new Identity(identity))
- .setListenPort(mbusConfig.port())
- .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
- .setNumNetworkThreads(mbusConfig.numthreads())
- .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
- .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
- SharedMessageBus bus = SharedMessageBus.newInstance(mbusParams, netParams);
- new ConfigAgent(messagebusConfig, bus.messageBus()); // Configure the wrapped MessageBus with a routing table.
- return bus;
+ MessageBus bus = new MessageBus(net, mbusParams);
+ new ConfigAgent(messagebusConfig, bus); // Configure the wrapped MessageBus with a routing table.
+ return new SharedMessageBus(bus);
}
private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
index bef3fdc1e34..a7cf5fec7d1 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
@@ -14,7 +14,7 @@ import java.util.List;
*
* @author Vegard Havdal
*/
-class NullNetwork implements Network {
+public class NullNetwork implements Network {
@Override
public boolean waitUntilReady(double seconds) {
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
index 9512c1a4873..d36234b0ccb 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
@@ -9,6 +9,9 @@ import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.messagebus.MessagebusConfig;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.shared.NullNetwork;
import com.yahoo.vespa.config.content.DistributionConfig;
import com.yahoo.vespa.config.content.LoadTypeConfig;
import org.junit.Test;
@@ -37,14 +40,13 @@ public class MbusClientProviderTest {
}
private void testClient(SessionConfig config) {
- SessionCache cache = new SessionCache(new ContainerMbusConfig.Builder().build(),
+ SessionCache cache = new SessionCache(NetworkMultiplexer.dedicated(new NullNetwork()),
+ new ContainerMbusConfig.Builder().build(),
new DocumentmanagerConfig.Builder().build(),
new LoadTypeConfig.Builder().build(),
- new SlobroksConfig.Builder().build(),
new MessagebusConfig.Builder().build(),
new DocumentProtocolPoliciesConfig.Builder().build(),
- new DistributionConfig.Builder().build(),
- "test");
+ new DistributionConfig.Builder().build());
MbusClientProvider p = new MbusClientProvider(cache, config);
assertNotNull(p.get());
p.deconstruct();
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
index c437261224d..a2b60bb779c 100644
--- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
@@ -1,12 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc.jdisc;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.collections.Pair;
import com.yahoo.component.ComponentId;
import com.yahoo.component.provider.ComponentRegistry;
import com.yahoo.container.core.document.ContainerDocumentConfig;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.container.jdisc.messagebus.MbusServerProvider;
+import com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider;
import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.docproc.CallStack;
import com.yahoo.docproc.DocprocService;
@@ -28,6 +30,8 @@ import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.jdisc.MbusClient;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ServerTestDriver;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.shared.SharedSourceSession;
import com.yahoo.vespa.config.content.DistributionConfig;
@@ -58,10 +62,12 @@ public abstract class DocumentProcessingHandlerTestBase {
driver = ServerTestDriver.newInactiveInstanceWithProtocol(protocol, true);
- sessionCache = new SessionCache(new ContainerMbusConfig.Builder().build(),
- driver.client().slobroksConfig(),
+ RPCNetwork net = new RPCNetwork(NetworkMultiplexerProvider.asParameters(new ContainerMbusConfig.Builder().build(),
+ driver.client().slobroksConfig(),
+ "test"));
+ sessionCache = new SessionCache(NetworkMultiplexer.dedicated(net),
+ new ContainerMbusConfig.Builder().build(),
new MessagebusConfig.Builder().build(),
- "test",
protocol);
ContainerBuilder builder = driver.parent().newContainerBuilder();
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
index 0a2a69368ac..252952e058c 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
@@ -25,7 +25,7 @@ import java.util.logging.Logger;
*
* @author jonmv
*/
-public class NetworkMultiplexer extends AbstractComponent implements NetworkOwner {
+public class NetworkMultiplexer implements NetworkOwner {
private static final Logger log = Logger.getLogger(NetworkMultiplexer.class.getName());
@@ -114,14 +114,13 @@ public class NetworkMultiplexer extends AbstractComponent implements NetworkOwne
net.shutdown();
}
- @Override
- public void deconstruct() {
+ public void destroy() {
if ( ! shared)
- throw new UnsupportedOperationException("Deconstruct called on a dedicated multiplexer; " +
- "this shuts down when detached from");
+ throw new UnsupportedOperationException("Destroy called on a dedicated multiplexer; " +
+ "this automatically shuts down when detached from");
if ( ! owners.isEmpty())
- log.warning("NetworkMultiplexer shut down before all owners detached: " + this);
+ log.warning("NetworkMultiplexer destroyed before all owners detached: " + this);
net.shutdown();
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
index 81b6e4cac27..808a0e95585 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
@@ -9,16 +9,13 @@ import com.yahoo.messagebus.test.SimpleMessage;
import com.yahoo.messagebus.test.SimpleProtocol;
import com.yahoo.text.Utf8Array;
import com.yahoo.text.Utf8String;
-import org.junit.Assert;
import org.junit.Test;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -81,7 +78,7 @@ public class NetworkMultiplexerTest {
shared.detach(owner2);
assertFalse(net.shutDown.get());
- shared.deconstruct();
+ shared.destroy();
assertTrue(net.shutDown.get());
}