aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-07-01 17:44:35 +0200
committerGitHub <noreply@github.com>2021-07-01 17:44:35 +0200
commitfd9eac104030565d9f63101fa85acc4e20fccf75 (patch)
tree45b2069dcad3e4e710e50f53fa1eb5c480a05166 /container-messagebus
parent57d72de784e93c39c9f935b461727655fe5b9e58 (diff)
Revert "Use injected config in SessionCache"
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java114
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java34
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java39
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java9
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java21
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java5
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java17
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java6
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java5
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java10
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java14
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java10
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/load-type.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg0
18 files changed, 142 insertions, 147 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index ab3d09af178..d65b2f7cc12 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -1,24 +1,22 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
-import com.google.inject.Inject;
-import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.config.subscription.ConfigGetter;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentUtil;
-import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.References;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
+import java.util.logging.Level;
+import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
@@ -28,12 +26,9 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.vespa.config.content.DistributionConfig;
-import com.yahoo.vespa.config.content.LoadTypeConfig;
import java.util.HashMap;
import java.util.Map;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -42,19 +37,24 @@ import java.util.logging.Logger;
* @author Steinar Knutsen
* @author Einar Rosenvinge
*/
-// TODO jonmv: Remove this: only used with more than one entry by FeedHandlerV3, where only timeout varies.
-// rant: This whole construct is because DI at one point didn't exist, so getting hold of a shared resource
-// or session was hard(?), and one resorted to routing through the Container, using URIs, to the correct
-// MbusClient, with or without throttling. This introduced the problem of ownership during shutdown,
-// which was solved with manual reference counting. This is all much better solved with DI, which (now)
-// owns everything, and does component shutdown in reverse construction order, which is always right.
-// So for the sake everyone's mental health, this should all just be removed now! I suspect this is
-// even the case for Request; we can track in handlers, and warn when requests have been misplaced.
+// TODO jonmv: Remove this? Only used sensibly by FeedHandlerV3, where only timeout varies.
public final class SessionCache extends AbstractComponent {
private static final Logger log = Logger.getLogger(SessionCache.class.getName());
- private final SharedMessageBus messageBus;
+ //config
+ private final String messagebusConfigId;
+ private final String slobrokConfigId;
+ private final String identity;
+ private final String containerMbusConfigId;
+ private final String documentManagerConfigId;
+ private final String loadTypeConfigId;
+ private final DocumentTypeManager documentTypeManager;
+
+ // initialized in start()
+ private ConfigAgent configAgent;
+ private SharedMessageBus messageBus;
+
private final Object intermediateLock = new Object();
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
@@ -63,48 +63,52 @@ public final class SessionCache extends AbstractComponent {
private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
- @Inject
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
- DistributionConfig distributionConfig) {
- this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig,
- messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //:
+ public SessionCache(String messagebusConfigId, String slobrokConfigId, String identity,
+ String containerMbusConfigId, String documentManagerConfigId,
+ String loadTypeConfigId,
+ DocumentTypeManager documentTypeManager) {
+ this.messagebusConfigId = messagebusConfigId;
+ this.slobrokConfigId = slobrokConfigId;
+ this.identity = identity;
+ this.containerMbusConfigId = containerMbusConfigId;
+ this.documentManagerConfigId = documentManagerConfigId;
+ this.loadTypeConfigId = loadTypeConfigId;
+ this.documentTypeManager = documentTypeManager;
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
- DistributionConfig distributionConfig, String identity) {
- this.messageBus = createSharedMessageBus(containerMbusConfig,
- messagebusConfig,
- slobroksConfig,
- identity,
- new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
- new LoadTypeSet(loadTypeConfig),
- policiesConfig,
- distributionConfig));
+ public SessionCache(final String identity) {
+ this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, String identity, Protocol protocol) {
- this.messageBus = createSharedMessageBus(containerMbusConfig,
- messagebusConfig,
- slobroksConfig,
- identity,
- protocol);
+ public void deconstruct() {
+ if (configAgent != null) {
+ configAgent.shutdown();
+ }
}
- public void deconstruct() {
- messageBus.release();
+
+ private void start() {
+ ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
+ if (documentManagerConfigId != null) {
+ documentTypeManager.configure(documentManagerConfigId);
+ }
+ LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId);
+ DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet);
+ messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol);
+ // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well
+ configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus());
+ configAgent.subscribe();
+ }
+
+
+ private boolean isStarted() {
+ return messageBus != null;
}
private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
- MessagebusConfig messagebusConfig,
- SlobroksConfig slobroksConfig, String identity,
+ String slobrokConfigId, String identity,
Protocol protocol) {
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol)
- .setMessageBusConfig(messagebusConfig);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
int maxPendingSize = DocumentUtil
.calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(),
@@ -115,7 +119,7 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingSize(maxPendingSize);
RPCNetworkParams netParams = new RPCNetworkParams()
- .setSlobroksConfig(slobroksConfig)
+ .setSlobrokConfigId(slobrokConfigId)
.setIdentity(new Identity(identity))
.setListenPort(mbusConfig.port())
.setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
@@ -140,10 +144,20 @@ public final class SessionCache extends AbstractComponent {
}
ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
return intermediatesCreator.retain(intermediateLock, intermediates, p);
}
public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
return sourcesCreator.retain(sourceLock, sources, p);
}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
index ea0ed7eadc8..111805d61b0 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
@@ -33,7 +33,7 @@ public class ClientTestDriver {
this.server = server;
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(server.slobroksConfig());
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
session = mbus.newSourceSession(new SourceSessionParams());
client = new MbusClient(session);
@@ -128,4 +128,7 @@ public class ClientTestDriver {
return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
}
+ public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
+ }
}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
index 6cd8fb8f34d..57d0abd980b 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
@@ -1,17 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.jdisc.test;
-import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.Protocol;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.Result;
-import com.yahoo.messagebus.SourceSession;
-import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.*;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -25,14 +17,16 @@ import java.util.concurrent.TimeUnit;
public class RemoteClient {
private final Slobrok slobrok;
+ private final String slobrokId;
private final MessageBus mbus;
private final ReplyQueue queue = new ReplyQueue();
private final SourceSession session;
- private RemoteClient(Protocol protocol, boolean network) {
- this.slobrok = newSlobrok();
+ private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
mbus = network
- ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobroksConfig(slobroksConfig())),
+ ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)),
new MessageBusParams().addProtocol(protocol))
: new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol));
session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue));
@@ -46,22 +40,28 @@ public class RemoteClient {
return queue.awaitReply(timeout, unit);
}
- public SlobroksConfig slobroksConfig() {
- return TestUtils.configFor(slobrok);
+ public String slobrokId() {
+ return slobrokId;
}
public void close() {
session.destroy();
mbus.destroy();
- slobrok.stop();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
}
public static RemoteClient newInstanceWithInternSlobrok(boolean network) {
- return new RemoteClient(new SimpleProtocol(), network);
+ return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network);
+ }
+
+ public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) {
+ return new RemoteClient(null, slobrokId, new SimpleProtocol(), network);
}
public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) {
- return new RemoteClient(protocol, network);
+ return new RemoteClient(newSlobrok(), null, protocol, network);
}
private static Slobrok newSlobrok() {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
index 0fb23e33709..1f0f82c4903 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
@@ -1,16 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.jdisc.test;
-import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.DestinationSession;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.Protocol;
-import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.*;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -24,14 +17,16 @@ import java.util.concurrent.TimeUnit;
public class RemoteServer {
private final Slobrok slobrok;
+ private final String slobrokId;
private final MessageBus mbus;
private final MessageQueue queue = new MessageQueue();
private final DestinationSession session;
- private RemoteServer(Protocol protocol, String identity) {
- this.slobrok = newSlobrok();
+ private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
- .setSlobroksConfig(slobroksConfig())
+ .setSlobrokConfigId(this.slobrokId)
.setIdentity(new Identity(identity))),
new MessageBusParams().addProtocol(protocol));
session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue));
@@ -53,22 +48,32 @@ public class RemoteServer {
session.reply(reply);
}
- public SlobroksConfig slobroksConfig() {
- return TestUtils.configFor(slobrok);
+ public String slobrokId() {
+ return slobrokId;
}
public void close() {
session.destroy();
mbus.destroy();
- slobrok.stop();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
}
public static RemoteServer newInstanceWithInternSlobrok() {
- return new RemoteServer(new SimpleProtocol(), "remote");
+ return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) {
+ return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) {
+ return new RemoteServer(null, slobrokId, protocol, identity);
}
- public static RemoteServer newInstance(String identity, Protocol protocol) {
- return new RemoteServer(protocol, identity);
+ public static RemoteServer newInstanceWithProtocol(Protocol protocol) {
+ return new RemoteServer(newSlobrok(), null, protocol, "remote");
}
private static Slobrok newSlobrok() {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
index fa0dd37ed13..e59db28e886 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
@@ -42,7 +42,7 @@ public class ServerTestDriver {
}
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(client.slobroksConfig());
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
ServerSession session = mbus.newDestinationSession(new DestinationSessionParams());
server = new MbusServer(driver, session);
@@ -130,6 +130,13 @@ public class ServerTestDriver {
guiceModules);
}
+ public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler,
+ boolean network, Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network),
+ true, requestHandler, new SimpleProtocol(), guiceModules);
+ }
+
public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) {
return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null,
new SimpleProtocol(), guiceModules);
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java
deleted file mode 100644
index 85ad241259f..00000000000
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.yahoo.cloud.config.SlobroksConfig;
-import com.yahoo.jrt.Spec;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-
-/**
- * @author jonmv
- */
-public class TestUtils {
-
- private TestUtils() { }
-
- public static SlobroksConfig configFor(Slobrok slobrok) {
- return new SlobroksConfig.Builder().slobrok(new SlobroksConfig.Slobrok.Builder()
- .connectionspec(new Spec("localhost", slobrok.port()).toString()))
- .build();
- }
-
-}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
index 259dd788244..dd135a51378 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
@@ -3,8 +3,6 @@ package com.yahoo.messagebus.shared;
import com.yahoo.config.subscription.ConfigGetter;
import com.yahoo.jdisc.AbstractResource;
-
-import java.util.Objects;
import java.util.logging.Level;
import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.IntermediateSessionParams;
@@ -27,7 +25,8 @@ public class SharedMessageBus extends AbstractResource {
private final MessageBus mbus;
public SharedMessageBus(MessageBus mbus) {
- this.mbus = Objects.requireNonNull(mbus);
+ mbus.getClass(); // throws NullPointerException
+ this.mbus = mbus;
}
public MessageBus messageBus() {
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
index 9512c1a4873..6335cf01d8c 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
@@ -1,16 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
-import com.yahoo.cloud.config.SlobroksConfig;
-import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.container.jdisc.config.SessionConfig;
import com.yahoo.container.jdisc.messagebus.MbusClientProvider;
import com.yahoo.container.jdisc.messagebus.SessionCache;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
-import com.yahoo.messagebus.MessagebusConfig;
-import com.yahoo.vespa.config.content.DistributionConfig;
-import com.yahoo.vespa.config.content.LoadTypeConfig;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
@@ -37,15 +30,7 @@ public class MbusClientProviderTest {
}
private void testClient(SessionConfig config) {
- SessionCache cache = new SessionCache(new ContainerMbusConfig.Builder().build(),
- new DocumentmanagerConfig.Builder().build(),
- new LoadTypeConfig.Builder().build(),
- new SlobroksConfig.Builder().build(),
- new MessagebusConfig.Builder().build(),
- new DocumentProtocolPoliciesConfig.Builder().build(),
- new DistributionConfig.Builder().build(),
- "test");
- MbusClientProvider p = new MbusClientProvider(cache, config);
+ MbusClientProvider p = new MbusClientProvider(new SessionCache("dir:src/test/resources/config/clientprovider"), config);
assertNotNull(p.get());
p.deconstruct();
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
index d24acb9d0d8..ef290a070cb 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
@@ -23,6 +23,10 @@ public class ClientTestDriverTestCase {
driver = ClientTestDriver.newInstanceWithProtocol(new SimpleProtocol());
assertNotNull(driver);
assertTrue(driver.close());
- }
+ Slobrok slobrok = new Slobrok();
+ driver = ClientTestDriver.newInstanceWithExternSlobrok(slobrok.configId());
+ assertNotNull(driver);
+ assertTrue(driver.close());
+ }
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
index 7b2c1b68549..f6ae2335d12 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
@@ -24,6 +24,11 @@ public class ServerTestDriverTestCase {
driver = ServerTestDriver.newInstanceWithProtocol(new SimpleProtocol(), new NonWorkingRequestHandler(), false);
assertNotNull(driver);
assertTrue(driver.close());
+
+ Slobrok slobrok = new Slobrok();
+ driver = ServerTestDriver.newInstanceWithExternSlobrok(slobrok.configId(), new NonWorkingRequestHandler(), false);
+ assertNotNull(driver);
+ assertTrue(driver.close());
}
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
index 759509946ce..78e79da4b9f 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
@@ -1,14 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
-import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.*;
import com.yahoo.messagebus.jdisc.test.MessageQueue;
import com.yahoo.messagebus.jdisc.test.RemoteClient;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
-import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -100,7 +98,7 @@ public class SharedDestinationSessionTestCase {
RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
MessageQueue queue = new MessageQueue();
DestinationSessionParams params = new DestinationSessionParams().setMessageHandler(queue);
- SharedDestinationSession session = newDestinationSession(client.slobroksConfig(), params);
+ SharedDestinationSession session = newDestinationSession(client.slobrokId(), params);
Route route = Route.parse(session.connectionSpec());
assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
@@ -122,11 +120,11 @@ public class SharedDestinationSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newDestinationSession(TestUtils.configFor(slobrok), new DestinationSessionParams());
+ return newDestinationSession(slobrok.configId(), new DestinationSessionParams());
}
- private static SharedDestinationSession newDestinationSession(SlobroksConfig slobroksConfig, DestinationSessionParams params) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
+ private static SharedDestinationSession newDestinationSession(String slobrokId, DestinationSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
SharedDestinationSession session = mbus.newDestinationSession(params);
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
index ca8301d1ea9..87958415149 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
-import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.*;
@@ -9,7 +8,6 @@ import com.yahoo.messagebus.jdisc.test.MessageQueue;
import com.yahoo.messagebus.jdisc.test.RemoteClient;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
-import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.local.LocalWire;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -79,7 +77,7 @@ public class SharedIntermediateSessionTestCase {
public void requireThatReplyHandlerCanNotBeSet() throws ListenFailedException {
Slobrok slobrok = new Slobrok();
try {
- newIntermediateSession(TestUtils.configFor(slobrok),
+ newIntermediateSession(slobrok.configId(),
new IntermediateSessionParams().setReplyHandler(new ReplyQueue()),
false);
fail();
@@ -113,7 +111,7 @@ public class SharedIntermediateSessionTestCase {
@Test
public void requireThatSessionCanSendMessage() throws InterruptedException {
RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
- SharedIntermediateSession session = newIntermediateSession(server.slobroksConfig(),
+ SharedIntermediateSession session = newIntermediateSession(server.slobrokId(),
new IntermediateSessionParams(),
true);
ReplyQueue queue = new ReplyQueue();
@@ -136,7 +134,7 @@ public class SharedIntermediateSessionTestCase {
RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
MessageQueue queue = new MessageQueue();
IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue);
- SharedIntermediateSession session = newIntermediateSession(client.slobroksConfig(), params, true);
+ SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true);
Route route = Route.parse(session.connectionSpec());
assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
@@ -158,13 +156,13 @@ public class SharedIntermediateSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newIntermediateSession(TestUtils.configFor(slobrok), new IntermediateSessionParams(), network);
+ return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network);
}
- private static SharedIntermediateSession newIntermediateSession(SlobroksConfig slobroksConfig,
+ private static SharedIntermediateSession newIntermediateSession(String slobrokId,
IntermediateSessionParams params,
boolean network) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = network
? SharedMessageBus.newInstance(mbusParams, netParams)
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
index 7a4a3d17170..1f0966fc961 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
-import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.Message;
@@ -9,7 +8,6 @@ import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
-import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -61,7 +59,7 @@ public class SharedSourceSessionTestCase {
@Test
public void requireThatSessionCanSendMessage() throws InterruptedException {
RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
- SharedSourceSession session = newSourceSession(server.slobroksConfig(),
+ SharedSourceSession session = newSourceSession(server.slobrokId(),
new SourceSessionParams());
ReplyQueue queue = new ReplyQueue();
Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
@@ -82,11 +80,11 @@ public class SharedSourceSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newSourceSession(TestUtils.configFor(slobrok), params);
+ return newSourceSession(slobrok.configId(), params);
}
- private static SharedSourceSession newSourceSession(SlobroksConfig slobroksConfig, SourceSessionParams params) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
+ private static SharedSourceSession newSourceSession(String slobrokId, SourceSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
SharedSourceSession session = mbus.newSourceSession(params);
diff --git a/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg b/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg
diff --git a/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg b/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg
diff --git a/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg b/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg
diff --git a/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg b/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg
diff --git a/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg b/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg