summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-08-17 19:30:07 +0200
committerGitHub <noreply@github.com>2021-08-17 19:30:07 +0200
commitf03038eec17af56fe8c3123cb016c868133a6588 (patch)
tree1d1ae784967e60a36585b6233632d22aca680f35
parentf76ce2e45f0b4142fd4451310b240e9277decb69 (diff)
parentff34e29daf239a3658bf0003af50fc6bfd0cd9fa (diff)
Merge pull request #18525 from vespa-engine/jonmv/reapply-di-session-cache
Session cache with injected config and network [run-systemtest]
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java5
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java1
-rw-r--r--container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java8
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java45
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java44
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java122
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java1
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java34
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java39
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java9
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java21
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java4
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java13
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java8
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java19
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java2
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java18
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java6
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java5
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java10
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java14
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java10
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/load-type.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg0
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java2
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java21
-rw-r--r--messagebus/abi-spec.json2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java51
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/Messenger.java23
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java143
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java11
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java20
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java52
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java7
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java3
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java3
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java8
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java188
45 files changed, 706 insertions, 283 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
index 02a0f606603..792fa3f1884 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainer.java
@@ -9,6 +9,7 @@ import com.yahoo.config.model.producer.AbstractConfigProducer;
import com.yahoo.config.provision.ClusterSpec;
import com.yahoo.config.provision.NodeResources;
import com.yahoo.search.config.QrStartConfig;
+import com.yahoo.vespa.model.container.component.SimpleComponent;
/**
* A container that is typically used by container clusters set up from the user application.
@@ -30,6 +31,10 @@ public final class ApplicationContainer extends Container implements
public ApplicationContainer(AbstractConfigProducer<?> parent, String name, boolean retired, int index, DeployState deployState) {
super(parent, name, retired, index, deployState);
this.isHostedVespa = deployState.isHosted();
+
+ addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.NetworkMultiplexerHolder"));
+ addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider"));
+ addComponent(new SimpleComponent("com.yahoo.container.jdisc.messagebus.SessionCache"));
}
@Override
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
index d0bcfde3c0c..57169391cbb 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/container/ApplicationContainerCluster.java
@@ -107,6 +107,7 @@ public final class ApplicationContainerCluster extends ContainerCluster<Applicat
addSimpleComponent("com.yahoo.container.jdisc.AthenzIdentityProviderProvider");
addSimpleComponent("com.yahoo.container.jdisc.SystemInfoProvider");
addSimpleComponent(com.yahoo.container.core.documentapi.DocumentAccessProvider.class.getName());
+
addMetricsHandlers();
addTestrunnerComponentsIfTester(deployState);
}
diff --git a/container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java b/container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java
index 0e4c4446778..9fe3728dc2c 100644
--- a/container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java
+++ b/container-disc/src/main/java/com/yahoo/container/jdisc/ConfiguredApplication.java
@@ -18,7 +18,6 @@ import com.yahoo.container.di.config.Subscriber;
import com.yahoo.container.di.config.SubscriberFactory;
import com.yahoo.container.http.filter.FilterChainRepository;
import com.yahoo.container.jdisc.component.Deconstructor;
-import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.container.jdisc.metric.DisableGuiceMetric;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.application.Application;
@@ -77,7 +76,6 @@ public final class ConfiguredApplication implements Application {
// to config to make sure that container will be registered in slobrok (by {@link com.yahoo.jrt.slobrok.api.Register})
// if slobrok config changes (typically slobroks moving to other nodes)
private final Optional<SlobrokConfigSubscriber> slobrokConfigSubscriber;
- private final SessionCache sessionCache;
//TODO: FilterChainRepository should instead always be set up in the model.
private final FilterChainRepository defaultFilterChainRepository =
@@ -127,7 +125,6 @@ public final class ConfiguredApplication implements Application {
this.slobrokConfigSubscriber = (subscriberFactory instanceof CloudSubscriberFactory)
? Optional.of(new SlobrokConfigSubscriber(configId))
: Optional.empty();
- this.sessionCache = new SessionCache(configId);
this.restrictedOsgiFramework = new DisableOsgiFramework(new RestrictedBundleContext(osgiFramework.bundleContext()));
}
@@ -148,8 +145,8 @@ public final class ConfiguredApplication implements Application {
}
/**
- * The container has no RPC methods, but we still need an RPC sever
- * to register in Slobrok to enable orchestration
+ * The container has no RPC methods, but we still need an RPC server
+ * to register in Slobrok to enable orchestration.
*/
private Register registerInSlobrok(QrConfig qrConfig) {
if ( ! qrConfig.rpc().enabled()) return null;
@@ -349,7 +346,6 @@ public final class ConfiguredApplication implements Application {
bind(OsgiFramework.class).toInstance(restrictedOsgiFramework);
bind(com.yahoo.jdisc.Timer.class).toInstance(timerSingleton);
bind(FilterChainRepository.class).toInstance(defaultFilterChainRepository);
- bind(SessionCache.class).toInstance(sessionCache); // Needed by e.g. FeedHandler
}
});
}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
new file mode 100644
index 00000000000..9020bfaffe3
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerHolder.java
@@ -0,0 +1,45 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.shared.NullNetwork;
+
+/**
+ * Holds a reference to a singleton {@link NetworkMultiplexer}.
+ *
+ * @author jonmv
+ */
+public class NetworkMultiplexerHolder extends AbstractComponent {
+
+ private final Object monitor = new Object();
+ private boolean destroyed = false;
+ private NetworkMultiplexer net;
+
+ /** Get the singleton RPCNetworkAdapter, creating it if this hasn't yet been done. */
+ public NetworkMultiplexer get(RPCNetworkParams params) {
+ synchronized (monitor) {
+ if (destroyed)
+ throw new IllegalStateException("Component already destroyed");
+
+ return net = net != null ? net : NetworkMultiplexer.shared(newNetwork(params));
+ }
+ }
+
+ private Network newNetwork(RPCNetworkParams params) {
+ return params.getSlobroksConfig().slobrok().isEmpty() ? new NullNetwork() : new RPCNetwork(params);
+ }
+
+ @Override
+ public void deconstruct() {
+ synchronized (monitor) {
+ net.destroy();
+ net = null;
+ destroyed = true;
+ }
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
new file mode 100644
index 00000000000..650ac92e779
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/NetworkMultiplexerProvider.java
@@ -0,0 +1,44 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+
+/**
+ * Injectable component which provides an {@link NetworkMultiplexer}, creating one if needed,
+ * i.e., the first time this is created in a container--subsequent creations of this will reuse
+ * the underlying network that was created initially. This breaks the DI pattern, but must be done
+ * because the network is a unique resource which cannot exist in several versions simultaneously.
+ *
+ * @author jonmv
+ */
+public class NetworkMultiplexerProvider {
+
+ private final NetworkMultiplexer net;
+
+ @Inject
+ public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig) {
+ this(net, mbusConfig, slobroksConfig, System.getProperty("config.id")); //:
+ }
+
+ public NetworkMultiplexerProvider(NetworkMultiplexerHolder net, ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) {
+ this.net = net.get(asParameters(mbusConfig, slobroksConfig, identity));
+ }
+
+ public static RPCNetworkParams asParameters(ContainerMbusConfig mbusConfig, SlobroksConfig slobroksConfig, String identity) {
+ return new RPCNetworkParams().setSlobroksConfig(slobroksConfig)
+ .setIdentity(new Identity(identity))
+ .setListenPort(mbusConfig.port())
+ .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
+ .setNumNetworkThreads(mbusConfig.numthreads())
+ .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
+ .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
+ }
+
+ public NetworkMultiplexer net() { return net; }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index d65b2f7cc12..3e3902b35aa 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -1,34 +1,40 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.component.AbstractComponent;
-import com.yahoo.config.subscription.ConfigGetter;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentUtil;
+import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.References;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
-import java.util.logging.Level;
import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.vespa.config.content.DistributionConfig;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
import java.util.HashMap;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -37,24 +43,19 @@ import java.util.logging.Logger;
* @author Steinar Knutsen
* @author Einar Rosenvinge
*/
-// TODO jonmv: Remove this? Only used sensibly by FeedHandlerV3, where only timeout varies.
+// TODO jonmv: Remove this: only used with more than one entry by FeedHandlerV3, where only timeout varies.
+// rant: This whole construct is because DI at one point didn't exist, so getting hold of a shared resource
+// or session was hard(?), and one resorted to routing through the Container, using URIs, to the correct
+// MbusClient, with or without throttling. This introduced the problem of ownership during shutdown,
+// which was solved with manual reference counting. This is all much better solved with DI, which (now)
+// owns everything, and does component shutdown in reverse construction order, which is always right.
+// So for the sake of everyone's mental health, this should all just be removed now! I suspect this is
+// even the case for Request; we can track in handlers, and warn when requests have been misplaced.
public final class SessionCache extends AbstractComponent {
private static final Logger log = Logger.getLogger(SessionCache.class.getName());
- //config
- private final String messagebusConfigId;
- private final String slobrokConfigId;
- private final String identity;
- private final String containerMbusConfigId;
- private final String documentManagerConfigId;
- private final String loadTypeConfigId;
- private final DocumentTypeManager documentTypeManager;
-
- // initialized in start()
- private ConfigAgent configAgent;
- private SharedMessageBus messageBus;
-
+ private final SharedMessageBus messageBus;
private final Object intermediateLock = new Object();
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
@@ -63,50 +64,43 @@ public final class SessionCache extends AbstractComponent {
private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
- public SessionCache(String messagebusConfigId, String slobrokConfigId, String identity,
- String containerMbusConfigId, String documentManagerConfigId,
- String loadTypeConfigId,
- DocumentTypeManager documentTypeManager) {
- this.messagebusConfigId = messagebusConfigId;
- this.slobrokConfigId = slobrokConfigId;
- this.identity = identity;
- this.containerMbusConfigId = containerMbusConfigId;
- this.documentManagerConfigId = documentManagerConfigId;
- this.loadTypeConfigId = loadTypeConfigId;
- this.documentTypeManager = documentTypeManager;
- }
+ @Inject
+ public SessionCache(NetworkMultiplexerProvider nets, ContainerMbusConfig containerMbusConfig,
+ DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
+ this(nets.net(), containerMbusConfig, documentmanagerConfig,
+ loadTypeConfig, messagebusConfig, policiesConfig, distributionConfig);
- public SessionCache(final String identity) {
- this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
}
- public void deconstruct() {
- if (configAgent != null) {
- configAgent.shutdown();
- }
+ public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
+ this(net,
+ containerMbusConfig,
+ messagebusConfig,
+ new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
+ new LoadTypeSet(loadTypeConfig),
+ policiesConfig,
+ distributionConfig));
}
-
- private void start() {
- ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
- if (documentManagerConfigId != null) {
- documentTypeManager.configure(documentManagerConfigId);
- }
- LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId);
- DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet);
- messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol);
- // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well
- configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus());
- configAgent.subscribe();
+ public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ MessagebusConfig messagebusConfig, Protocol protocol) {
+ this.messageBus = createSharedMessageBus(net, containerMbusConfig, messagebusConfig, protocol);
}
-
- private boolean isStarted() {
- return messageBus != null;
+ public void deconstruct() {
+ messageBus.release();
}
- private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
- String slobrokConfigId, String identity,
+ private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net,
+ ContainerMbusConfig mbusConfig,
+ MessagebusConfig messagebusConfig,
Protocol protocol) {
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
@@ -118,15 +112,9 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount());
mbusParams.setMaxPendingSize(maxPendingSize);
- RPCNetworkParams netParams = new RPCNetworkParams()
- .setSlobrokConfigId(slobrokConfigId)
- .setIdentity(new Identity(identity))
- .setListenPort(mbusConfig.port())
- .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
- .setNumNetworkThreads(mbusConfig.numthreads())
- .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
- .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
- return SharedMessageBus.newInstance(mbusParams, netParams);
+ MessageBus bus = new MessageBus(net, mbusParams);
+ new ConfigAgent(messagebusConfig, bus); // Configure the wrapped MessageBus with a routing table.
+ return new SharedMessageBus(bus);
}
private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) {
@@ -144,20 +132,10 @@ public final class SessionCache extends AbstractComponent {
}
ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
- synchronized (this) {
- if (!isStarted()) {
- start();
- }
- }
return intermediatesCreator.retain(intermediateLock, intermediates, p);
}
public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
- synchronized (this) {
- if (!isStarted()) {
- start();
- }
- }
return sourcesCreator.retain(sourceLock, sources, p);
}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
index e26e1e7e134..a2131f22dc4 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
@@ -50,6 +50,7 @@ public final class MbusServer extends AbstractResource implements ServerProvider
public void close() {
log.log(Level.FINE, "Closing message bus server.");
running.set(false);
+ session.close();
}
@Override
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
index 111805d61b0..ea0ed7eadc8 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
@@ -33,7 +33,7 @@ public class ClientTestDriver {
this.server = server;
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(server.slobroksConfig());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
session = mbus.newSourceSession(new SourceSessionParams());
client = new MbusClient(session);
@@ -128,7 +128,4 @@ public class ClientTestDriver {
return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
}
- public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
- return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
- }
}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
index 57d0abd980b..6cd8fb8f34d 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
@@ -1,9 +1,17 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.jdisc.test;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -17,16 +25,14 @@ import java.util.concurrent.TimeUnit;
public class RemoteClient {
private final Slobrok slobrok;
- private final String slobrokId;
private final MessageBus mbus;
private final ReplyQueue queue = new ReplyQueue();
private final SourceSession session;
- private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) {
- this.slobrok = slobrok;
- this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ private RemoteClient(Protocol protocol, boolean network) {
+ this.slobrok = newSlobrok();
mbus = network
- ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)),
+ ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobroksConfig(slobroksConfig())),
new MessageBusParams().addProtocol(protocol))
: new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol));
session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue));
@@ -40,28 +46,22 @@ public class RemoteClient {
return queue.awaitReply(timeout, unit);
}
- public String slobrokId() {
- return slobrokId;
+ public SlobroksConfig slobroksConfig() {
+ return TestUtils.configFor(slobrok);
}
public void close() {
session.destroy();
mbus.destroy();
- if (slobrok != null) {
- slobrok.stop();
- }
+ slobrok.stop();
}
public static RemoteClient newInstanceWithInternSlobrok(boolean network) {
- return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network);
- }
-
- public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) {
- return new RemoteClient(null, slobrokId, new SimpleProtocol(), network);
+ return new RemoteClient(new SimpleProtocol(), network);
}
public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) {
- return new RemoteClient(newSlobrok(), null, protocol, network);
+ return new RemoteClient(protocol, network);
}
private static Slobrok newSlobrok() {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
index 1f0f82c4903..0fb23e33709 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
@@ -1,9 +1,16 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.jdisc.test;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -17,16 +24,14 @@ import java.util.concurrent.TimeUnit;
public class RemoteServer {
private final Slobrok slobrok;
- private final String slobrokId;
private final MessageBus mbus;
private final MessageQueue queue = new MessageQueue();
private final DestinationSession session;
- private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) {
- this.slobrok = slobrok;
- this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ private RemoteServer(Protocol protocol, String identity) {
+ this.slobrok = newSlobrok();
mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
- .setSlobrokConfigId(this.slobrokId)
+ .setSlobroksConfig(slobroksConfig())
.setIdentity(new Identity(identity))),
new MessageBusParams().addProtocol(protocol));
session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue));
@@ -48,32 +53,22 @@ public class RemoteServer {
session.reply(reply);
}
- public String slobrokId() {
- return slobrokId;
+ public SlobroksConfig slobroksConfig() {
+ return TestUtils.configFor(slobrok);
}
public void close() {
session.destroy();
mbus.destroy();
- if (slobrok != null) {
- slobrok.stop();
- }
+ slobrok.stop();
}
public static RemoteServer newInstanceWithInternSlobrok() {
- return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote");
- }
-
- public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) {
- return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote");
- }
-
- public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) {
- return new RemoteServer(null, slobrokId, protocol, identity);
+ return new RemoteServer(new SimpleProtocol(), "remote");
}
- public static RemoteServer newInstanceWithProtocol(Protocol protocol) {
- return new RemoteServer(newSlobrok(), null, protocol, "remote");
+ public static RemoteServer newInstance(String identity, Protocol protocol) {
+ return new RemoteServer(protocol, identity);
}
private static Slobrok newSlobrok() {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
index e59db28e886..fa0dd37ed13 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
@@ -42,7 +42,7 @@ public class ServerTestDriver {
}
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId());
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(client.slobroksConfig());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
ServerSession session = mbus.newDestinationSession(new DestinationSessionParams());
server = new MbusServer(driver, session);
@@ -130,13 +130,6 @@ public class ServerTestDriver {
guiceModules);
}
- public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler,
- boolean network, Module... guiceModules)
- {
- return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network),
- true, requestHandler, new SimpleProtocol(), guiceModules);
- }
-
public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) {
return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null,
new SimpleProtocol(), guiceModules);
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java
new file mode 100644
index 00000000000..85ad241259f
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java
@@ -0,0 +1,21 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+
+/**
+ * @author jonmv
+ */
+public class TestUtils {
+
+ private TestUtils() { }
+
+ public static SlobroksConfig configFor(Slobrok slobrok) {
+ return new SlobroksConfig.Builder().slobrok(new SlobroksConfig.Slobrok.Builder()
+ .connectionspec(new Spec("localhost", slobrok.port()).toString()))
+ .build();
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
index ad58d6b9a5e..a7cf5fec7d1 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
@@ -12,9 +12,9 @@ import java.util.List;
/**
* <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p>
*
- * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a>
+ * @author Vegard Havdal
*/
-class NullNetwork implements Network {
+public class NullNetwork implements Network {
@Override
public boolean waitUntilReady(double seconds) {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
index 56713815c7a..4cd4a776292 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
@@ -10,13 +10,16 @@ import com.yahoo.messagebus.Reply;
*/
public interface ServerSession extends SharedResource {
- public MessageHandler getMessageHandler();
+ MessageHandler getMessageHandler();
- public void setMessageHandler(MessageHandler msgHandler);
+ void setMessageHandler(MessageHandler msgHandler);
- public void sendReply(Reply reply);
+ void sendReply(Reply reply);
- public String connectionSpec();
+ String connectionSpec();
+
+ String name();
+
+ void close();
- public String name();
}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
index 7da164757cd..5a9cd39c5b4 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
@@ -77,6 +77,11 @@ public class SharedDestinationSession extends AbstractResource implements Messag
}
@Override
+ public void close() {
+ session.destroy();
+ }
+
+ @Override
protected void destroy() {
log.log(Level.FINE, "Destroying shared destination session.");
session.destroy();
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
index 5c9fab46e34..64cc1aaf510 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
@@ -96,6 +96,11 @@ public class SharedIntermediateSession extends AbstractResource
}
@Override
+ public void close() {
+ session.destroy();
+ }
+
+ @Override
protected void destroy() {
log.log(Level.FINE, "Destroying shared intermediate session.");
session.destroy();
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
index dd135a51378..39bd6e86aa2 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
@@ -3,7 +3,10 @@ package com.yahoo.messagebus.shared;
import com.yahoo.config.subscription.ConfigGetter;
import com.yahoo.jdisc.AbstractResource;
+
+import java.util.Objects;
import java.util.logging.Level;
+
import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBus;
@@ -25,8 +28,7 @@ public class SharedMessageBus extends AbstractResource {
private final MessageBus mbus;
public SharedMessageBus(MessageBus mbus) {
- mbus.getClass(); // throws NullPointerException
- this.mbus = mbus;
+ this.mbus = Objects.requireNonNull(mbus);
}
public MessageBus messageBus() {
@@ -65,4 +67,6 @@ public class SharedMessageBus extends AbstractResource {
}
return new RPCNetwork(params);
}
+
+
}
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
index 6335cf01d8c..2a1c4e9a15a 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
@@ -1,9 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.container.jdisc.config.SessionConfig;
-import com.yahoo.container.jdisc.messagebus.MbusClientProvider;
-import com.yahoo.container.jdisc.messagebus.SessionCache;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
+import com.yahoo.messagebus.MessagebusConfig;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.shared.NullNetwork;
+import com.yahoo.vespa.config.content.DistributionConfig;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
@@ -30,7 +36,14 @@ public class MbusClientProviderTest {
}
private void testClient(SessionConfig config) {
- MbusClientProvider p = new MbusClientProvider(new SessionCache("dir:src/test/resources/config/clientprovider"), config);
+ SessionCache cache = new SessionCache(NetworkMultiplexer.dedicated(new NullNetwork()),
+ new ContainerMbusConfig.Builder().build(),
+ new DocumentmanagerConfig.Builder().build(),
+ new LoadTypeConfig.Builder().build(),
+ new MessagebusConfig.Builder().build(),
+ new DocumentProtocolPoliciesConfig.Builder().build(),
+ new DistributionConfig.Builder().build());
+ MbusClientProvider p = new MbusClientProvider(cache, config);
assertNotNull(p.get());
p.deconstruct();
}
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
index 89bc5b9cecd..709f78f5cf0 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
@@ -17,7 +17,7 @@ import static org.junit.Assert.assertTrue;
/**
* Check the completeness of the mbus session key classes.
*
- * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author Steinar Knutsen
*/
public class MbusSessionKeyTestCase {
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
index 9d45d2e7abf..6ebb41c4ab7 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
@@ -331,9 +331,7 @@ public class MbusServerTestCase {
int refCount = 1;
@Override
- public void sendReply(Reply reply) {
-
- }
+ public void sendReply(Reply reply) { }
@Override
public MessageHandler getMessageHandler() {
@@ -341,9 +339,7 @@ public class MbusServerTestCase {
}
@Override
- public void setMessageHandler(MessageHandler msgHandler) {
-
- }
+ public void setMessageHandler(MessageHandler msgHandler) { }
@Override
public String connectionSpec() {
@@ -356,14 +352,12 @@ public class MbusServerTestCase {
}
@Override
+ public void close() { }
+
+ @Override
public ResourceReference refer() {
++refCount;
- return new ResourceReference() {
- @Override
- public void close() {
- --refCount;
- }
- };
+ return () -> --refCount;
}
@Override
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
index ef290a070cb..d24acb9d0d8 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
@@ -23,10 +23,6 @@ public class ClientTestDriverTestCase {
driver = ClientTestDriver.newInstanceWithProtocol(new SimpleProtocol());
assertNotNull(driver);
assertTrue(driver.close());
-
- Slobrok slobrok = new Slobrok();
- driver = ClientTestDriver.newInstanceWithExternSlobrok(slobrok.configId());
- assertNotNull(driver);
- assertTrue(driver.close());
}
+
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
index f6ae2335d12..7b2c1b68549 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
@@ -24,11 +24,6 @@ public class ServerTestDriverTestCase {
driver = ServerTestDriver.newInstanceWithProtocol(new SimpleProtocol(), new NonWorkingRequestHandler(), false);
assertNotNull(driver);
assertTrue(driver.close());
-
- Slobrok slobrok = new Slobrok();
- driver = ServerTestDriver.newInstanceWithExternSlobrok(slobrok.configId(), new NonWorkingRequestHandler(), false);
- assertNotNull(driver);
- assertTrue(driver.close());
}
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
index 78e79da4b9f..759509946ce 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
@@ -1,12 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.*;
import com.yahoo.messagebus.jdisc.test.MessageQueue;
import com.yahoo.messagebus.jdisc.test.RemoteClient;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -98,7 +100,7 @@ public class SharedDestinationSessionTestCase {
RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
MessageQueue queue = new MessageQueue();
DestinationSessionParams params = new DestinationSessionParams().setMessageHandler(queue);
- SharedDestinationSession session = newDestinationSession(client.slobrokId(), params);
+ SharedDestinationSession session = newDestinationSession(client.slobroksConfig(), params);
Route route = Route.parse(session.connectionSpec());
assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
@@ -120,11 +122,11 @@ public class SharedDestinationSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newDestinationSession(slobrok.configId(), new DestinationSessionParams());
+ return newDestinationSession(TestUtils.configFor(slobrok), new DestinationSessionParams());
}
- private static SharedDestinationSession newDestinationSession(String slobrokId, DestinationSessionParams params) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ private static SharedDestinationSession newDestinationSession(SlobroksConfig slobroksConfig, DestinationSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
SharedDestinationSession session = mbus.newDestinationSession(params);
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
index 87958415149..ca8301d1ea9 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.*;
@@ -8,6 +9,7 @@ import com.yahoo.messagebus.jdisc.test.MessageQueue;
import com.yahoo.messagebus.jdisc.test.RemoteClient;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.local.LocalWire;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -77,7 +79,7 @@ public class SharedIntermediateSessionTestCase {
public void requireThatReplyHandlerCanNotBeSet() throws ListenFailedException {
Slobrok slobrok = new Slobrok();
try {
- newIntermediateSession(slobrok.configId(),
+ newIntermediateSession(TestUtils.configFor(slobrok),
new IntermediateSessionParams().setReplyHandler(new ReplyQueue()),
false);
fail();
@@ -111,7 +113,7 @@ public class SharedIntermediateSessionTestCase {
@Test
public void requireThatSessionCanSendMessage() throws InterruptedException {
RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
- SharedIntermediateSession session = newIntermediateSession(server.slobrokId(),
+ SharedIntermediateSession session = newIntermediateSession(server.slobroksConfig(),
new IntermediateSessionParams(),
true);
ReplyQueue queue = new ReplyQueue();
@@ -134,7 +136,7 @@ public class SharedIntermediateSessionTestCase {
RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
MessageQueue queue = new MessageQueue();
IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue);
- SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true);
+ SharedIntermediateSession session = newIntermediateSession(client.slobroksConfig(), params, true);
Route route = Route.parse(session.connectionSpec());
assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
@@ -156,13 +158,13 @@ public class SharedIntermediateSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network);
+ return newIntermediateSession(TestUtils.configFor(slobrok), new IntermediateSessionParams(), network);
}
- private static SharedIntermediateSession newIntermediateSession(String slobrokId,
+ private static SharedIntermediateSession newIntermediateSession(SlobroksConfig slobroksConfig,
IntermediateSessionParams params,
boolean network) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = network
? SharedMessageBus.newInstance(mbusParams, netParams)
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
index 1f0966fc961..7a4a3d17170 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.Message;
@@ -8,6 +9,7 @@ import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -59,7 +61,7 @@ public class SharedSourceSessionTestCase {
@Test
public void requireThatSessionCanSendMessage() throws InterruptedException {
RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
- SharedSourceSession session = newSourceSession(server.slobrokId(),
+ SharedSourceSession session = newSourceSession(server.slobroksConfig(),
new SourceSessionParams());
ReplyQueue queue = new ReplyQueue();
Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
@@ -80,11 +82,11 @@ public class SharedSourceSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newSourceSession(slobrok.configId(), params);
+ return newSourceSession(TestUtils.configFor(slobrok), params);
}
- private static SharedSourceSession newSourceSession(String slobrokId, SourceSessionParams params) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ private static SharedSourceSession newSourceSession(SlobroksConfig slobroksConfig, SourceSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
SharedSourceSession session = mbus.newSourceSession(params);
diff --git a/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg b/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg
+++ /dev/null
diff --git a/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg b/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg
+++ /dev/null
diff --git a/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg b/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg
+++ /dev/null
diff --git a/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg b/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg
+++ /dev/null
diff --git a/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg b/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg
+++ /dev/null
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java
index c508e7da61d..268a40f04c9 100644
--- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
/**
- * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @author Einar M R Rosenvinge
*/
public class DocumentProcessingHandlerAllMessageTypesTestCase extends DocumentProcessingHandlerTestBase {
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
index 05009f484c8..a2b60bb779c 100644
--- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
@@ -1,11 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.docproc.jdisc;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.collections.Pair;
import com.yahoo.component.ComponentId;
import com.yahoo.component.provider.ComponentRegistry;
import com.yahoo.container.core.document.ContainerDocumentConfig;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.container.jdisc.messagebus.MbusServerProvider;
+import com.yahoo.container.jdisc.messagebus.NetworkMultiplexerProvider;
import com.yahoo.container.jdisc.messagebus.SessionCache;
import com.yahoo.docproc.CallStack;
import com.yahoo.docproc.DocprocService;
@@ -13,19 +16,26 @@ import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.loadtypes.LoadType;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.jdisc.AbstractResource;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.jdisc.MbusClient;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ServerTestDriver;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.vespa.config.content.DistributionConfig;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
import org.junit.After;
import org.junit.Before;
@@ -52,8 +62,13 @@ public abstract class DocumentProcessingHandlerTestBase {
driver = ServerTestDriver.newInactiveInstanceWithProtocol(protocol, true);
- sessionCache =
- new SessionCache("raw:", driver.client().slobrokId(), "test", "raw:", null, "raw:", documentTypeManager);
+ RPCNetwork net = new RPCNetwork(NetworkMultiplexerProvider.asParameters(new ContainerMbusConfig.Builder().build(),
+ driver.client().slobroksConfig(),
+ "test"));
+ sessionCache = new SessionCache(NetworkMultiplexer.dedicated(net),
+ new ContainerMbusConfig.Builder().build(),
+ new MessagebusConfig.Builder().build(),
+ protocol);
ContainerBuilder builder = driver.parent().newContainerBuilder();
ComponentRegistry<DocprocService> registry = new ComponentRegistry<>();
@@ -102,7 +117,7 @@ public abstract class DocumentProcessingHandlerTestBase {
resource.release();
}
- remoteServer = RemoteServer.newInstance(driver.client().slobrokId(), "foobar", protocol);
+ remoteServer = RemoteServer.newInstance("foobar", protocol);
}
@After
diff --git a/messagebus/abi-spec.json b/messagebus/abi-spec.json
index f3e7c5664d6..85c1eaa4f09 100644
--- a/messagebus/abi-spec.json
+++ b/messagebus/abi-spec.json
@@ -311,6 +311,7 @@
"public void register(com.yahoo.messagebus.MessageBus$SendBlockedMessages)",
"public void <init>(com.yahoo.messagebus.network.Network, java.util.List)",
"public void <init>(com.yahoo.messagebus.network.Network, com.yahoo.messagebus.MessageBusParams)",
+ "public void <init>(com.yahoo.messagebus.network.NetworkMultiplexer, com.yahoo.messagebus.MessageBusParams)",
"public boolean destroy()",
"public void sync()",
"public com.yahoo.messagebus.SourceSession createSourceSession(com.yahoo.messagebus.ReplyHandler)",
@@ -326,7 +327,6 @@
"public void deliverMessage(com.yahoo.messagebus.Message, java.lang.String)",
"public void putProtocol(com.yahoo.messagebus.Protocol)",
"public com.yahoo.messagebus.Protocol getProtocol(com.yahoo.text.Utf8Array)",
- "public com.yahoo.messagebus.Protocol getProtocol(com.yahoo.text.Utf8String)",
"public void deliverReply(com.yahoo.messagebus.Reply, com.yahoo.messagebus.ReplyHandler)",
"public void setupRouting(com.yahoo.messagebus.routing.RoutingSpec)",
"public com.yahoo.messagebus.routing.Resender getResender()",
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 99d9a6d6483..4b674b86aeb 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -1,10 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus;
-import com.yahoo.concurrent.CopyOnWriteHashMap;
import com.yahoo.concurrent.SystemTimer;
import java.util.logging.Level;
import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.network.NetworkOwner;
import com.yahoo.messagebus.routing.Resender;
import com.yahoo.messagebus.routing.RetryPolicy;
@@ -64,7 +64,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
private final ProtocolRepository protocolRepository = new ProtocolRepository();
private final AtomicReference<Map<String, RoutingTable>> tablesRef = new AtomicReference<>(null);
private final Map<String, MessageHandler> sessions = new ConcurrentHashMap<>();
- private final Network net;
+ private final NetworkMultiplexer net;
private final Messenger msn;
private final Resender resender;
private int maxPendingCount;
@@ -117,13 +117,25 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
/**
* <p>Constructs an instance of message bus. This requires a network object
- * that it will associate with. This assignment may not change during the
- * lifetime of this message bus.</p>
+ * that it will associate with. This assignment may not change during the lifetime
+ * of this message bus, and this bus will be the single owner of this net.</p>
*
* @param net The network to associate with.
* @param params The parameters that controls this bus.
*/
public MessageBus(Network net, MessageBusParams params) {
+ this(NetworkMultiplexer.dedicated(net), params);
+ }
+
+ /**
+ * <p>Constructs an instance of message bus. This requires a network multiplexer
+ * that it will associate with. This assignment may not change during the
+ * lifetime of this message bus.</p>
+ *
+ * @param net The network multiplexer to associate with.
+ * @param params The parameters that controls this bus.
+ */
+ public MessageBus(NetworkMultiplexer net, MessageBusParams params) {
// Add all known protocols to the repository.
maxPendingCount = params.getMaxPendingCount();
maxPendingSize = params.getMaxPendingSize();
@@ -134,7 +146,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
// Attach and start network.
this.net = net;
net.attach(this);
- if ( ! net.waitUntilReady(120))
+ if ( ! net.net().waitUntilReady(120))
throw new IllegalStateException("Network failed to become ready in time.");
// Start messenger.
@@ -167,7 +179,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
careTaker.join();
} catch (InterruptedException e) { }
protocolRepository.clearPolicyCache();
- net.shutdown();
+ net.detach(this);
msn.destroy();
if (resender != null) {
resender.destroy();
@@ -186,7 +198,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
*/
public void sync() {
msn.sync();
- net.sync();
+ net.net().sync();
}
/**
@@ -267,9 +279,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
IntermediateSession session = new IntermediateSession(this, params);
sessions.put(params.getName(), session);
- if (params.getBroadcastName()) {
- net.registerSession(params.getName());
- }
+ net.registerSession(params.getName(), this, params.getBroadcastName());
return session;
}
@@ -310,9 +320,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
DestinationSession session = new DestinationSession(this, params);
sessions.put(params.getName(), session);
- if (params.getBroadcastName()) {
- net.registerSession(params.getName());
- }
+ net.registerSession(params.getName(), this, params.getBroadcastName());
return session;
}
@@ -325,9 +333,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* @param broadcastName Whether or not session name was broadcast.
*/
public synchronized void unregisterSession(String name, boolean broadcastName) {
- if (broadcastName) {
- net.unregisterSession(name);
- }
+ net.unregisterSession(name, this, broadcastName);
sessions.remove(name);
}
@@ -371,7 +377,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
deliverError(msg, ErrorCode.SEQUENCE_ERROR, "Bucket sequences not supported when resender is enabled.");
return;
}
- SendProxy proxy = new SendProxy(this, net, resender);
+ SendProxy proxy = new SendProxy(this, net.net(), resender);
msn.deliverMessage(msg, proxy);
}
@@ -396,7 +402,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
if (msgHandler == null) {
deliverError(msg, ErrorCode.UNKNOWN_SESSION, "Session '" + session + "' does not exist.");
} else if (!checkPending(msg)) {
- deliverError(msg, ErrorCode.SESSION_BUSY, "Session '" + net.getConnectionSpec() + "/" + session +
+ deliverError(msg, ErrorCode.SESSION_BUSY, "Session '" + net.net().getConnectionSpec() + "/" + session +
"' is busy, try again later.");
} else {
msn.deliverMessage(msg, msgHandler);
@@ -419,11 +425,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
return protocolRepository.getProtocol(name.toString());
}
- public Protocol getProtocol(Utf8String name) {
- return getProtocol((Utf8Array)name);
- }
-
- @Override
public void deliverReply(Reply reply, ReplyHandler handler) {
msn.deliverReply(reply, handler);
}
@@ -569,7 +570,7 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* @return The connection string.
*/
public String getConnectionSpec() {
- return net.getConnectionSpec();
+ return net.net().getConnectionSpec();
}
/**
@@ -608,5 +609,5 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
}
}
-}
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
index 0cc8310937b..871a8f2a0b4 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/Messenger.java
@@ -128,7 +128,7 @@ public class Messenger implements Runnable {
*/
public boolean destroy() {
boolean done = false;
- enqueue(Terminate.INSTANCE);
+ enqueue(TERMINATE);
if (!destroyed.getAndSet(true)) {
try {
synchronized (this) {
@@ -161,7 +161,7 @@ public class Messenger implements Runnable {
task = queue.poll();
}
}
- if (task == Terminate.INSTANCE) {
+ if (task == TERMINATE) {
break;
}
if (task != null) {
@@ -175,7 +175,7 @@ public class Messenger implements Runnable {
} catch (final Exception e) {
log.warning("An exception was thrown while destroying " + task.getClass().getName() + ": " +
e.toString());
- log.warning("Someone, somewhere might have to wait indefinetly for something.");
+ log.warning("Someone, somewhere might have to wait indefinitely for something.");
}
}
for (final Task child : children) {
@@ -235,18 +235,9 @@ public class Messenger implements Runnable {
}
}
- private static class Terminate implements Task {
+ private static final Task TERMINATE = new Task() {
+ @Override public void run() { }
+ @Override public void destroy() { }
+ };
- static final Terminate INSTANCE = new Terminate();
-
- @Override
- public void run() {
- // empty
- }
-
- @Override
- public void destroy() {
- // empty
- }
- }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
new file mode 100644
index 00000000000..5e6a8c7688e
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkMultiplexer.java
@@ -0,0 +1,143 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network;
+
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.text.Utf8Array;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.logging.Logger;
+
+/**
+ * A bridge between the reusable, singleton RPC network, and the generational message bus which uses this.
+ * The RPC network is required to be singular because of its unique resources, such as sockets.
+ * This is complicated by the message bus potentially existing in different graph generation at any point in
+ * time, with all copies potential users of the network interface, but where each message bus-registered session
+ * should belong to a single message bus. This class solves these problems by tracking which sessions are
+ * active in which message bus instance, and by (de)registering only when a session is registered to (no) message
+ * bus instances.
+ *
+ * In time, this should allow us to get rid of the shared-this-and-that in the container, too ...
+ *
+ * @author jonmv
+ */
+public class NetworkMultiplexer implements NetworkOwner {
+
+ private static final Logger log = Logger.getLogger(NetworkMultiplexer.class.getName());
+
+ private final Network net;
+ private final Queue<NetworkOwner> owners = new ConcurrentLinkedQueue<>();
+ private final Map<String, Queue<NetworkOwner>> sessions = new ConcurrentHashMap<>();
+ private final boolean shared;
+
+ private NetworkMultiplexer(Network net, boolean shared) {
+ net.attach(this);
+ this.net = net;
+ this.shared = shared;
+ }
+
+ /** Returns a network multiplexer which will be shared between several {@link NetworkOwner}s. */
+ public static NetworkMultiplexer shared(Network net) {
+ return new NetworkMultiplexer(net, true);
+ }
+
+ /** Returns a network multiplexer with a single {@link NetworkOwner}, which shuts down when this owner detaches. */
+ public static NetworkMultiplexer dedicated(Network net) {
+ return new NetworkMultiplexer(net, false);
+ }
+
+ public void registerSession(String session, NetworkOwner owner, boolean broadcast) {
+ sessions.compute(session, (name, owners) -> {
+ if (owners == null) {
+ owners = new ConcurrentLinkedQueue<>();
+ if (broadcast)
+ net.registerSession(session);
+ }
+ else if (owners.contains(owner))
+ throw new IllegalArgumentException("Session '" + session + "' with owner '" + owner + "' already registered with this");
+
+ owners.add(owner);
+ return owners;
+ });
+ }
+
+ public void unregisterSession(String session, NetworkOwner owner, boolean broadcast) {
+ sessions.compute(session, (name, owners) -> {
+ if (owners == null || ! owners.remove(owner))
+ throw new IllegalArgumentException("Session '" + session + "' not registered with owner '" + owner + "'");
+
+ if (owners.isEmpty()) {
+ if (broadcast)
+ net.unregisterSession(session);
+ return null;
+ }
+ return owners;
+ });
+ }
+
+ @Override
+ public Protocol getProtocol(Utf8Array name) {
+ // Should ideally couple this to the actual receiver ...
+ Protocol protocol = null;
+ for (NetworkOwner owner : owners)
+ protocol = owner.getProtocol(name) == null ? protocol : owner.getProtocol(name);
+
+ return protocol;
+ }
+
+ @Override
+ public void deliverMessage(Message message, String session) {
+ // Send to first owner which has registered this session, or fall back to first attached owner (for rejection).
+ NetworkOwner owner = sessions.getOrDefault(session, owners).peek();
+ if (owner == null) { // Should not happen.
+ log.warning(this + " received message '" + message + "' with no owners attached");
+ message.discard();
+ }
+ else
+ owner.deliverMessage(message, session);
+ }
+
+ public void attach(NetworkOwner owner) {
+ if (owners.contains(owner))
+ throw new IllegalArgumentException(owner + " is already attached to this");
+
+ owners.add(owner);
+ }
+
+ public void detach(NetworkOwner owner) {
+ if ( ! owners.remove(owner))
+ throw new IllegalArgumentException(owner + " not attached to this");
+
+ if ( ! shared && owners.isEmpty())
+ net.shutdown();
+ }
+
+ public void destroy() {
+ if ( ! shared)
+ throw new UnsupportedOperationException("Destroy called on a dedicated multiplexer; " +
+ "this automatically shuts down when detached from");
+
+ if ( ! owners.isEmpty())
+ log.warning("NetworkMultiplexer destroyed before all owners detached: " + this);
+
+ net.shutdown();
+ }
+
+ public Network net() {
+ return net;
+ }
+
+ @Override
+ public String toString() {
+ return "NetworkMultiplexer{" +
+ "net=" + net +
+ ", owners=" + owners +
+ ", sessions=" + sessions +
+ ", shared=" + shared +
+ '}';
+ }
+
+} \ No newline at end of file
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java
index 2b55bf5b901..c4185d67251 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java
@@ -24,7 +24,7 @@ public interface NetworkOwner {
* @param name The name of the protocol to return.
* @return The named protocol.
*/
- public Protocol getProtocol(Utf8Array name);
+ Protocol getProtocol(Utf8Array name);
/**
* All messages that arrive in the network layer is passed to its owner through this function.
@@ -32,13 +32,6 @@ public interface NetworkOwner {
* @param message The message that just arrived from the network.
* @param session The name of the session that is the recipient of the request.
*/
- public void deliverMessage(Message message, String session);
+ void deliverMessage(Message message, String session);
- /**
- * All replies that arrive in the network layer is passed through this to unentangle it from the network thread.
- *
- * @param reply The reply that just arrived from the network.
- * @param handler The handler that is to receive the reply.
- */
- public void deliverReply(Reply reply, ReplyHandler handler);
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index f3c6422fbfd..06b29ed524c 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -115,7 +115,7 @@ public class LocalNetwork implements Network {
error.getMessage(),
error.getService() != null ? error.getService() : envelope.sender.hostId));
}
- owner.deliverReply(reply, envelope.parent.recipient);
+ envelope.parent.recipient.handleReply(reply);
});
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 1c41f87d1ee..e95883d8dda 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -184,10 +184,8 @@ public class RPCNetwork implements Network, MethodHandler {
}
this.owner = owner;
- RPCSendAdapter adapter1 = new RPCSendV1();
- RPCSendAdapter adapter2 = new RPCSendV2();
- addSendAdapter(new Version(5), adapter1);
- addSendAdapter(new Version(6,149), adapter2);
+ sendAdapters.put(new Version(5), new RPCSendV1(this));
+ sendAdapters.put(new Version(6,149), new RPCSendV2(this));
}
@Override
@@ -327,18 +325,6 @@ public class RPCNetwork implements Network, MethodHandler {
}
/**
- * Registers a send adapter for a given version. This will overwrite whatever is already registered under the same
- * version.
- *
- * @param version the version for which to register an adapter
- * @param adapter the adapter to register
- */
- private void addSendAdapter(Version version, RPCSendAdapter adapter) {
- adapter.attach(this);
- sendAdapters.put(version, adapter);
- }
-
- /**
* Determines and returns the send adapter that is compatible with the given version. If no adapter can be found,
* this method returns null.
*
@@ -362,7 +348,7 @@ public class RPCNetwork implements Network, MethodHandler {
Reply reply = new EmptyReply();
reply.getTrace().setLevel(ctx.traceLevel);
reply.addError(new Error(errCode, errMsg));
- owner.deliverReply(reply, recipient);
+ recipient.handleReply(reply);
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
index 1cc45eeb2d8..3c4e7627fc8 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
@@ -18,6 +18,7 @@ import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Routable;
import com.yahoo.messagebus.Trace;
import com.yahoo.messagebus.TraceLevel;
+import com.yahoo.messagebus.network.NetworkOwner;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingNode;
@@ -30,9 +31,9 @@ import com.yahoo.text.Utf8Array;
*/
public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
- private RPCNetwork net = null;
- private String clientIdent = "client";
- private String serverIdent = "server";
+ private final RPCNetwork net;
+ private final String clientIdent;
+ private final String serverIdent;
protected abstract Method buildMethod();
protected abstract String getReturnSpec();
@@ -41,13 +42,16 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
protected abstract Reply createReply(Values ret, String serviceName, Trace trace);
protected abstract Params toParams(Values req);
protected abstract void createResponse(Values ret, Reply reply, Version version, byte [] payload);
- @Override
- public final void attach(RPCNetwork net) {
+
+ protected RPCSend(RPCNetwork net) {
this.net = net;
String prefix = net.getIdentity().getServicePrefix();
if (prefix != null && prefix.length() > 0) {
- clientIdent = "'" + prefix + "'";
- serverIdent = clientIdent;
+ this.serverIdent = this.clientIdent = "'" + prefix + "'";
+ }
+ else {
+ this.clientIdent = "client";
+ this.serverIdent = "server";
}
net.getSupervisor().addMethod(buildMethod());
}
@@ -76,7 +80,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
}
Reply reply = new EmptyReply();
reply.getTrace().swap(ctx.trace);
- net.getOwner().deliverReply(reply, recipient);
+ recipient.handleReply(reply);
} else {
req.setContext(ctx);
address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
@@ -121,16 +125,16 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
switch (req.errorCode()) {
case com.yahoo.jrt.ErrorCode.TIMEOUT:
error = new Error(ErrorCode.TIMEOUT,
- "A timeout occured while waiting for '" + serviceName + "' (" +
+ "A timeout occurred while waiting for '" + serviceName + "' (" +
ctx.timeout + " seconds expired); " + req.errorMessage());
break;
case com.yahoo.jrt.ErrorCode.CONNECTION:
error = new Error(ErrorCode.CONNECTION_ERROR,
- "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
+ "A connection error occurred for '" + serviceName + "'; " + req.errorMessage());
break;
default:
error = new Error(ErrorCode.NETWORK_ERROR,
- "A network error occured for '" + serviceName + "'; " + req.errorMessage());
+ "A network error occurred for '" + serviceName + "'; " + req.errorMessage());
}
} else {
reply = createReply(req.returnValues(), serviceName, ctx.trace);
@@ -143,7 +147,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
if (error != null) {
reply.addError(error);
}
- net.getOwner().deliverReply(reply, ctx.recipient);
+ ctx.recipient.handleReply(reply);
}
protected final class Params {
@@ -172,20 +176,20 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
// Make sure that the owner understands the protocol.
Protocol protocol = net.getOwner().getProtocol(p.protocolName);
if (protocol == null) {
- replyError(request, p.version, p.traceLevel,
+ replyError(request, p.version, protocol, p.traceLevel,
new Error(ErrorCode.UNKNOWN_PROTOCOL,
"Protocol '" + p.protocolName + "' is not known by " + serverIdent + "."));
return;
}
Routable routable = protocol.decode(p.version, p.payload);
if (routable == null) {
- replyError(request, p.version, p.traceLevel,
+ replyError(request, p.version, protocol, p.traceLevel,
new Error(ErrorCode.DECODE_ERROR,
"Protocol '" + protocol.getName() + "' failed to decode routable."));
return;
}
if (routable instanceof Reply) {
- replyError(request, p.version, p.traceLevel,
+ replyError(request, p.version, protocol, p.traceLevel,
new Error(ErrorCode.DECODE_ERROR,
"Payload decoded to a reply when expecting a message."));
return;
@@ -194,7 +198,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
if (p.route != null && p.route.length() > 0) {
msg.setRoute(net.getRoute(p.route));
}
- msg.setContext(new ReplyContext(request, p.version));
+ msg.setContext(new ReplyContext(request, p.version, protocol));
msg.pushHandler(this);
msg.setRetryEnabled(p.retryEnabled);
msg.setRetry(p.retry);
@@ -222,13 +226,12 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
// Encode and return the reply through the RPC request.
byte[] payload = new byte[0];
if (reply.getType() != 0) {
- Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
- if (protocol != null) {
- payload = protocol.encode(ctx.version, reply);
+ if (ctx.protocol != null) {
+ payload = ctx.protocol.encode(ctx.version, reply);
}
if (payload == null || payload.length == 0) {
reply.addError(new Error(ErrorCode.ENCODE_ERROR,
- "An error occured while encoding the reply."));
+ "An error occurred while encoding the reply."));
}
}
createResponse(ctx.request.returnValues(), reply, ctx.version, payload);
@@ -241,11 +244,12 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
* @param request The JRT request to reply to.
* @param version The version to serialize for.
* @param traceLevel The trace level to set in the reply.
+ * @param protocol The message protocol to serialize with.
* @param err The error to reply with.
*/
- private void replyError(Request request, Version version, int traceLevel, Error err) {
+ private void replyError(Request request, Version version, Protocol protocol, int traceLevel, Error err) {
Reply reply = new EmptyReply();
- reply.setContext(new ReplyContext(request, version));
+ reply.setContext(new ReplyContext(request, version, protocol));
reply.getTrace().setLevel(traceLevel);
reply.addError(err);
handleReply(reply);
@@ -268,10 +272,12 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
final Request request;
final Version version;
+ final Protocol protocol;
- ReplyContext(Request request, Version version) {
+ ReplyContext(Request request, Version version, Protocol protocol) {
this.request = request;
this.version = version;
+ this.protocol = protocol;
}
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java
index 749ba4f4451..ea6cf59726b 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendAdapter.java
@@ -13,13 +13,6 @@ import com.yahoo.messagebus.routing.RoutingNode;
public interface RPCSendAdapter {
/**
- * Attaches this adapter to the given network.
- *
- * @param net The network to attach to.
- */
- void attach(RPCNetwork net);
-
- /**
* Performs the actual sending to the given recipient.
*
* @param recipient The recipient to send to.
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
index ccded0e8d1b..e6d090e3d23 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
@@ -33,8 +33,11 @@ public class RPCSendV1 extends RPCSend {
private final String METHOD_PARAMS = "sssbilsxi";
private final String METHOD_RETURN = "sdISSsxs";
+ protected RPCSendV1(RPCNetwork net) { super(net); }
+
@Override
protected String getReturnSpec() { return METHOD_RETURN; }
+
@Override
protected Method buildMethod() {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
index bb243651447..6ec3ea5ec7d 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
@@ -35,8 +35,11 @@ public class RPCSendV2 extends RPCSend {
private final static String METHOD_RETURN = "bixbix";
private final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024);
+ protected RPCSendV2(RPCNetwork net) { super(net); }
+
@Override
protected String getReturnSpec() { return METHOD_RETURN; }
+
@Override
protected Method buildMethod() {
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
index 05fc6f62236..42d1ff9ba2d 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
@@ -805,9 +805,13 @@ public class RoutingNode implements ReplyHandler {
this.serviceAddress = serviceAddress;
}
+ /** Proxy through message bus in case it was destroyed in the meantime. */
@Override
public void handleReply(Reply reply) {
- setReply(reply);
- notifyParent();
+ mbus.deliverReply(reply, r -> {
+ setReply(reply);
+ notifyParent();
+ });
}
+
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
new file mode 100644
index 00000000000..f1d83550601
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/NetworkMultiplexerTest.java
@@ -0,0 +1,188 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.routing.RoutingNode;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.text.Utf8Array;
+import com.yahoo.text.Utf8String;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author jonmv
+ */
+public class NetworkMultiplexerTest {
+
+ @Test
+ public void testShared() {
+ MockNetwork net = new MockNetwork();
+ MockOwner owner1 = new MockOwner();
+ MockOwner owner2 = new MockOwner();
+ NetworkMultiplexer shared = NetworkMultiplexer.shared(net);
+ assertEquals(Set.of(shared), net.attached);
+ assertEquals(Set.of(), net.registered);
+ assertFalse(net.shutDown.get());
+
+ shared.attach(owner1);
+ shared.registerSession("s1", owner1, true);
+ try {
+ shared.registerSession("s1", owner1, true);
+ fail("Illegal to register same session multiple times with the same owner");
+ }
+ catch (IllegalArgumentException expected) {
+ assertEquals("Session 's1' with owner 'mock owner' already registered with this", expected.getMessage());
+ }
+ assertEquals(Set.of("s1"), net.registered);
+
+ shared.attach(owner2);
+ shared.registerSession("s1", owner2, true);
+ shared.registerSession("s2", owner2, true);
+ shared.registerSession("s3", owner2, false);
+ assertEquals(Set.of("s1", "s2"), net.registered);
+
+ Utf8String name = new Utf8String("protocol");
+ Protocol protocol1 = new SimpleProtocol();
+ Protocol protocol2 = new SimpleProtocol();
+ owner1.protocols.put(name, protocol1);
+ assertEquals(protocol1, shared.getProtocol(name));
+ owner2.protocols.put(name, protocol2);
+ assertEquals(protocol2, shared.getProtocol(name));
+
+ Message message1 = new SimpleMessage("one");
+ Message message2 = new SimpleMessage("two");
+ Message message3 = new SimpleMessage("three");
+ Message message4 = new SimpleMessage("four");
+ shared.deliverMessage(message1, "s1");
+ shared.deliverMessage(message2, "s2");
+ shared.unregisterSession("s1", owner1, true);
+ shared.deliverMessage(message3, "s1");
+ shared.deliverMessage(message4, "s3");
+ assertEquals(Map.of("s1", List.of(message1)), owner1.messages);
+ assertEquals(Map.of("s2", List.of(message2), "s1", List.of(message3), "s3", List.of(message4)), owner2.messages);
+
+ shared.detach(owner1);
+ assertEquals(protocol2, shared.getProtocol(name));
+
+ shared.detach(owner2);
+ assertFalse(net.shutDown.get());
+
+ shared.destroy();
+ assertTrue(net.shutDown.get());
+ }
+
+ @Test
+ public void testDedicated() {
+ MockNetwork net = new MockNetwork();
+ MockOwner owner = new MockOwner();
+ NetworkMultiplexer dedicated = NetworkMultiplexer.dedicated(net);
+ assertEquals(Set.of(dedicated), net.attached);
+ assertEquals(Set.of(), net.registered);
+ assertFalse(net.shutDown.get());
+
+ dedicated.attach(owner);
+ dedicated.detach(owner);
+ assertTrue(net.shutDown.get());
+ }
+
+ static class MockOwner implements NetworkOwner {
+
+ final Map<Utf8Array, Protocol> protocols = new HashMap<>();
+ final Map<String, List<Message>> messages = new HashMap<>();
+
+ @Override
+ public Protocol getProtocol(Utf8Array name) {
+ return protocols.get(name);
+ }
+
+ @Override
+ public void deliverMessage(Message message, String session) {
+ messages.computeIfAbsent(session, __ -> new ArrayList<>()).add(message);
+ }
+
+ @Override
+ public String toString() {
+ return "mock owner";
+ }
+
+ }
+
+ static class MockNetwork implements Network {
+
+ final Set<NetworkOwner> attached = new HashSet<>();
+ final Set<String> registered = new HashSet<>();
+ final AtomicBoolean shutDown = new AtomicBoolean();
+
+ @Override
+ public boolean waitUntilReady(double seconds) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void attach(NetworkOwner owner) {
+ assertTrue(attached.add(owner));
+ }
+
+ @Override
+ public void registerSession(String session) {
+ assertTrue(registered.add(session));
+ }
+
+ @Override
+ public void unregisterSession(String session) {
+ assertTrue(registered.remove(session));
+ }
+
+ @Override
+ public boolean allocServiceAddress(RoutingNode recipient) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void freeServiceAddress(RoutingNode recipient) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void send(Message msg, List<RoutingNode> recipients) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void sync() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void shutdown() {
+ assertFalse(shutDown.getAndSet(true));
+ }
+
+ @Override
+ public String getConnectionSpec() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IMirror getMirror() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+}