aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-07-02 13:16:25 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-16 21:01:03 +0200
commitba0cf910aec5e4bd921e4e7d77010982537490ad (patch)
tree387a783c56a0e2abaa1a57fc578a210f289a622d /container-messagebus
parent249377274b11826daffe0ae8c2bf3424e758dadb (diff)
Revert "Merge pull request #18515 from vespa-engine/revert-18507-jonmv/reapply-session-cache-with-injected-config"
This reverts commit fdea20ac7ba02313c94246e7cf77125320147d56, reversing changes made to 7537c42fa661952916cf91eebbb75e6984505100.
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java114
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java34
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java39
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java9
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java21
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java5
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java17
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java6
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java5
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java10
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java14
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java10
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/load-type.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg0
18 files changed, 147 insertions, 142 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index d65b2f7cc12..ab3d09af178 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -1,22 +1,24 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.component.AbstractComponent;
-import com.yahoo.config.subscription.ConfigGetter;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentUtil;
+import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.References;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
-import java.util.logging.Level;
-import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
@@ -26,9 +28,12 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.vespa.config.content.DistributionConfig;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
import java.util.HashMap;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -37,24 +42,19 @@ import java.util.logging.Logger;
* @author Steinar Knutsen
* @author Einar Rosenvinge
*/
-// TODO jonmv: Remove this? Only used sensibly by FeedHandlerV3, where only timeout varies.
+// TODO jonmv: Remove this: only used with more than one entry by FeedHandlerV3, where only timeout varies.
+// rant: This whole construct is because DI at one point didn't exist, so getting hold of a shared resource
+// or session was hard(?), and one resorted to routing through the Container, using URIs, to the correct
+// MbusClient, with or without throttling. This introduced the problem of ownership during shutdown,
+// which was solved with manual reference counting. This is all much better solved with DI, which (now)
+// owns everything, and does component shutdown in reverse construction order, which is always right.
+// So for the sake everyone's mental health, this should all just be removed now! I suspect this is
+// even the case for Request; we can track in handlers, and warn when requests have been misplaced.
public final class SessionCache extends AbstractComponent {
private static final Logger log = Logger.getLogger(SessionCache.class.getName());
- //config
- private final String messagebusConfigId;
- private final String slobrokConfigId;
- private final String identity;
- private final String containerMbusConfigId;
- private final String documentManagerConfigId;
- private final String loadTypeConfigId;
- private final DocumentTypeManager documentTypeManager;
-
- // initialized in start()
- private ConfigAgent configAgent;
- private SharedMessageBus messageBus;
-
+ private final SharedMessageBus messageBus;
private final Object intermediateLock = new Object();
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
@@ -63,52 +63,48 @@ public final class SessionCache extends AbstractComponent {
private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
- public SessionCache(String messagebusConfigId, String slobrokConfigId, String identity,
- String containerMbusConfigId, String documentManagerConfigId,
- String loadTypeConfigId,
- DocumentTypeManager documentTypeManager) {
- this.messagebusConfigId = messagebusConfigId;
- this.slobrokConfigId = slobrokConfigId;
- this.identity = identity;
- this.containerMbusConfigId = containerMbusConfigId;
- this.documentManagerConfigId = documentManagerConfigId;
- this.loadTypeConfigId = loadTypeConfigId;
- this.documentTypeManager = documentTypeManager;
+ @Inject
+ public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
+ MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
+ this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig,
+ messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //:
}
- public SessionCache(final String identity) {
- this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
- }
-
- public void deconstruct() {
- if (configAgent != null) {
- configAgent.shutdown();
- }
+ public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
+ MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig, String identity) {
+ this.messageBus = createSharedMessageBus(containerMbusConfig,
+ messagebusConfig,
+ slobroksConfig,
+ identity,
+ new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
+ new LoadTypeSet(loadTypeConfig),
+ policiesConfig,
+ distributionConfig));
}
-
- private void start() {
- ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
- if (documentManagerConfigId != null) {
- documentTypeManager.configure(documentManagerConfigId);
- }
- LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId);
- DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet);
- messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol);
- // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well
- configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus());
- configAgent.subscribe();
+ public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig,
+ MessagebusConfig messagebusConfig, String identity, Protocol protocol) {
+ this.messageBus = createSharedMessageBus(containerMbusConfig,
+ messagebusConfig,
+ slobroksConfig,
+ identity,
+ protocol);
}
-
- private boolean isStarted() {
- return messageBus != null;
+ public void deconstruct() {
+ messageBus.release();
}
private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
- String slobrokConfigId, String identity,
+ MessagebusConfig messagebusConfig,
+ SlobroksConfig slobroksConfig, String identity,
Protocol protocol) {
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol)
+ .setMessageBusConfig(messagebusConfig);
int maxPendingSize = DocumentUtil
.calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(),
@@ -119,7 +115,7 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingSize(maxPendingSize);
RPCNetworkParams netParams = new RPCNetworkParams()
- .setSlobrokConfigId(slobrokConfigId)
+ .setSlobroksConfig(slobroksConfig)
.setIdentity(new Identity(identity))
.setListenPort(mbusConfig.port())
.setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
@@ -144,20 +140,10 @@ public final class SessionCache extends AbstractComponent {
}
ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
- synchronized (this) {
- if (!isStarted()) {
- start();
- }
- }
return intermediatesCreator.retain(intermediateLock, intermediates, p);
}
public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
- synchronized (this) {
- if (!isStarted()) {
- start();
- }
- }
return sourcesCreator.retain(sourceLock, sources, p);
}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
index 111805d61b0..ea0ed7eadc8 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
@@ -33,7 +33,7 @@ public class ClientTestDriver {
this.server = server;
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(server.slobroksConfig());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
session = mbus.newSourceSession(new SourceSessionParams());
client = new MbusClient(session);
@@ -128,7 +128,4 @@ public class ClientTestDriver {
return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
}
- public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
- return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
- }
}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
index 57d0abd980b..6cd8fb8f34d 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
@@ -1,9 +1,17 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.jdisc.test;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -17,16 +25,14 @@ import java.util.concurrent.TimeUnit;
public class RemoteClient {
private final Slobrok slobrok;
- private final String slobrokId;
private final MessageBus mbus;
private final ReplyQueue queue = new ReplyQueue();
private final SourceSession session;
- private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) {
- this.slobrok = slobrok;
- this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ private RemoteClient(Protocol protocol, boolean network) {
+ this.slobrok = newSlobrok();
mbus = network
- ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)),
+ ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobroksConfig(slobroksConfig())),
new MessageBusParams().addProtocol(protocol))
: new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol));
session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue));
@@ -40,28 +46,22 @@ public class RemoteClient {
return queue.awaitReply(timeout, unit);
}
- public String slobrokId() {
- return slobrokId;
+ public SlobroksConfig slobroksConfig() {
+ return TestUtils.configFor(slobrok);
}
public void close() {
session.destroy();
mbus.destroy();
- if (slobrok != null) {
- slobrok.stop();
- }
+ slobrok.stop();
}
public static RemoteClient newInstanceWithInternSlobrok(boolean network) {
- return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network);
- }
-
- public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) {
- return new RemoteClient(null, slobrokId, new SimpleProtocol(), network);
+ return new RemoteClient(new SimpleProtocol(), network);
}
public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) {
- return new RemoteClient(newSlobrok(), null, protocol, network);
+ return new RemoteClient(protocol, network);
}
private static Slobrok newSlobrok() {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
index 1f0f82c4903..0fb23e33709 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
@@ -1,9 +1,16 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.jdisc.test;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.rpc.RPCNetwork;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -17,16 +24,14 @@ import java.util.concurrent.TimeUnit;
public class RemoteServer {
private final Slobrok slobrok;
- private final String slobrokId;
private final MessageBus mbus;
private final MessageQueue queue = new MessageQueue();
private final DestinationSession session;
- private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) {
- this.slobrok = slobrok;
- this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ private RemoteServer(Protocol protocol, String identity) {
+ this.slobrok = newSlobrok();
mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
- .setSlobrokConfigId(this.slobrokId)
+ .setSlobroksConfig(slobroksConfig())
.setIdentity(new Identity(identity))),
new MessageBusParams().addProtocol(protocol));
session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue));
@@ -48,32 +53,22 @@ public class RemoteServer {
session.reply(reply);
}
- public String slobrokId() {
- return slobrokId;
+ public SlobroksConfig slobroksConfig() {
+ return TestUtils.configFor(slobrok);
}
public void close() {
session.destroy();
mbus.destroy();
- if (slobrok != null) {
- slobrok.stop();
- }
+ slobrok.stop();
}
public static RemoteServer newInstanceWithInternSlobrok() {
- return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote");
- }
-
- public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) {
- return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote");
- }
-
- public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) {
- return new RemoteServer(null, slobrokId, protocol, identity);
+ return new RemoteServer(new SimpleProtocol(), "remote");
}
- public static RemoteServer newInstanceWithProtocol(Protocol protocol) {
- return new RemoteServer(newSlobrok(), null, protocol, "remote");
+ public static RemoteServer newInstance(String identity, Protocol protocol) {
+ return new RemoteServer(protocol, identity);
}
private static Slobrok newSlobrok() {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
index e59db28e886..fa0dd37ed13 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
@@ -42,7 +42,7 @@ public class ServerTestDriver {
}
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId());
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(client.slobroksConfig());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
ServerSession session = mbus.newDestinationSession(new DestinationSessionParams());
server = new MbusServer(driver, session);
@@ -130,13 +130,6 @@ public class ServerTestDriver {
guiceModules);
}
- public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler,
- boolean network, Module... guiceModules)
- {
- return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network),
- true, requestHandler, new SimpleProtocol(), guiceModules);
- }
-
public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) {
return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null,
new SimpleProtocol(), guiceModules);
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java
new file mode 100644
index 00000000000..85ad241259f
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/TestUtils.java
@@ -0,0 +1,21 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+
+/**
+ * @author jonmv
+ */
+public class TestUtils {
+
+ private TestUtils() { }
+
+ public static SlobroksConfig configFor(Slobrok slobrok) {
+ return new SlobroksConfig.Builder().slobrok(new SlobroksConfig.Slobrok.Builder()
+ .connectionspec(new Spec("localhost", slobrok.port()).toString()))
+ .build();
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
index dd135a51378..259dd788244 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
@@ -3,6 +3,8 @@ package com.yahoo.messagebus.shared;
import com.yahoo.config.subscription.ConfigGetter;
import com.yahoo.jdisc.AbstractResource;
+
+import java.util.Objects;
import java.util.logging.Level;
import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.IntermediateSessionParams;
@@ -25,8 +27,7 @@ public class SharedMessageBus extends AbstractResource {
private final MessageBus mbus;
public SharedMessageBus(MessageBus mbus) {
- mbus.getClass(); // throws NullPointerException
- this.mbus = mbus;
+ this.mbus = Objects.requireNonNull(mbus);
}
public MessageBus messageBus() {
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
index 6335cf01d8c..9512c1a4873 100644
--- a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
@@ -1,9 +1,16 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
+import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.container.jdisc.config.SessionConfig;
import com.yahoo.container.jdisc.messagebus.MbusClientProvider;
import com.yahoo.container.jdisc.messagebus.SessionCache;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
+import com.yahoo.messagebus.MessagebusConfig;
+import com.yahoo.vespa.config.content.DistributionConfig;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
@@ -30,7 +37,15 @@ public class MbusClientProviderTest {
}
private void testClient(SessionConfig config) {
- MbusClientProvider p = new MbusClientProvider(new SessionCache("dir:src/test/resources/config/clientprovider"), config);
+ SessionCache cache = new SessionCache(new ContainerMbusConfig.Builder().build(),
+ new DocumentmanagerConfig.Builder().build(),
+ new LoadTypeConfig.Builder().build(),
+ new SlobroksConfig.Builder().build(),
+ new MessagebusConfig.Builder().build(),
+ new DocumentProtocolPoliciesConfig.Builder().build(),
+ new DistributionConfig.Builder().build(),
+ "test");
+ MbusClientProvider p = new MbusClientProvider(cache, config);
assertNotNull(p.get());
p.deconstruct();
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
index ef290a070cb..d24acb9d0d8 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
@@ -23,10 +23,6 @@ public class ClientTestDriverTestCase {
driver = ClientTestDriver.newInstanceWithProtocol(new SimpleProtocol());
assertNotNull(driver);
assertTrue(driver.close());
-
- Slobrok slobrok = new Slobrok();
- driver = ClientTestDriver.newInstanceWithExternSlobrok(slobrok.configId());
- assertNotNull(driver);
- assertTrue(driver.close());
}
+
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
index f6ae2335d12..7b2c1b68549 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
@@ -24,11 +24,6 @@ public class ServerTestDriverTestCase {
driver = ServerTestDriver.newInstanceWithProtocol(new SimpleProtocol(), new NonWorkingRequestHandler(), false);
assertNotNull(driver);
assertTrue(driver.close());
-
- Slobrok slobrok = new Slobrok();
- driver = ServerTestDriver.newInstanceWithExternSlobrok(slobrok.configId(), new NonWorkingRequestHandler(), false);
- assertNotNull(driver);
- assertTrue(driver.close());
}
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
index 78e79da4b9f..759509946ce 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
@@ -1,12 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.*;
import com.yahoo.messagebus.jdisc.test.MessageQueue;
import com.yahoo.messagebus.jdisc.test.RemoteClient;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -98,7 +100,7 @@ public class SharedDestinationSessionTestCase {
RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
MessageQueue queue = new MessageQueue();
DestinationSessionParams params = new DestinationSessionParams().setMessageHandler(queue);
- SharedDestinationSession session = newDestinationSession(client.slobrokId(), params);
+ SharedDestinationSession session = newDestinationSession(client.slobroksConfig(), params);
Route route = Route.parse(session.connectionSpec());
assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
@@ -120,11 +122,11 @@ public class SharedDestinationSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newDestinationSession(slobrok.configId(), new DestinationSessionParams());
+ return newDestinationSession(TestUtils.configFor(slobrok), new DestinationSessionParams());
}
- private static SharedDestinationSession newDestinationSession(String slobrokId, DestinationSessionParams params) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ private static SharedDestinationSession newDestinationSession(SlobroksConfig slobroksConfig, DestinationSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
SharedDestinationSession session = mbus.newDestinationSession(params);
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
index 87958415149..ca8301d1ea9 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.*;
@@ -8,6 +9,7 @@ import com.yahoo.messagebus.jdisc.test.MessageQueue;
import com.yahoo.messagebus.jdisc.test.RemoteClient;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.local.LocalNetwork;
import com.yahoo.messagebus.network.local.LocalWire;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
@@ -77,7 +79,7 @@ public class SharedIntermediateSessionTestCase {
public void requireThatReplyHandlerCanNotBeSet() throws ListenFailedException {
Slobrok slobrok = new Slobrok();
try {
- newIntermediateSession(slobrok.configId(),
+ newIntermediateSession(TestUtils.configFor(slobrok),
new IntermediateSessionParams().setReplyHandler(new ReplyQueue()),
false);
fail();
@@ -111,7 +113,7 @@ public class SharedIntermediateSessionTestCase {
@Test
public void requireThatSessionCanSendMessage() throws InterruptedException {
RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
- SharedIntermediateSession session = newIntermediateSession(server.slobrokId(),
+ SharedIntermediateSession session = newIntermediateSession(server.slobroksConfig(),
new IntermediateSessionParams(),
true);
ReplyQueue queue = new ReplyQueue();
@@ -134,7 +136,7 @@ public class SharedIntermediateSessionTestCase {
RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
MessageQueue queue = new MessageQueue();
IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue);
- SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true);
+ SharedIntermediateSession session = newIntermediateSession(client.slobroksConfig(), params, true);
Route route = Route.parse(session.connectionSpec());
assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
@@ -156,13 +158,13 @@ public class SharedIntermediateSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network);
+ return newIntermediateSession(TestUtils.configFor(slobrok), new IntermediateSessionParams(), network);
}
- private static SharedIntermediateSession newIntermediateSession(String slobrokId,
+ private static SharedIntermediateSession newIntermediateSession(SlobroksConfig slobroksConfig,
IntermediateSessionParams params,
boolean network) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = network
? SharedMessageBus.newInstance(mbusParams, netParams)
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
index 1f0966fc961..7a4a3d17170 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.messagebus.shared;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.slobrok.server.Slobrok;
import com.yahoo.messagebus.Message;
@@ -8,6 +9,7 @@ import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.jdisc.test.RemoteServer;
import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.jdisc.test.TestUtils;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -59,7 +61,7 @@ public class SharedSourceSessionTestCase {
@Test
public void requireThatSessionCanSendMessage() throws InterruptedException {
RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
- SharedSourceSession session = newSourceSession(server.slobrokId(),
+ SharedSourceSession session = newSourceSession(server.slobroksConfig(),
new SourceSessionParams());
ReplyQueue queue = new ReplyQueue();
Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
@@ -80,11 +82,11 @@ public class SharedSourceSessionTestCase {
} catch (ListenFailedException e) {
fail();
}
- return newSourceSession(slobrok.configId(), params);
+ return newSourceSession(TestUtils.configFor(slobrok), params);
}
- private static SharedSourceSession newSourceSession(String slobrokId, SourceSessionParams params) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ private static SharedSourceSession newSourceSession(SlobroksConfig slobroksConfig, SourceSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobroksConfig(slobroksConfig);
MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
SharedSourceSession session = mbus.newSourceSession(params);
diff --git a/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg b/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg
+++ /dev/null
diff --git a/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg b/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg
+++ /dev/null
diff --git a/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg b/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg
+++ /dev/null
diff --git a/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg b/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg
+++ /dev/null
diff --git a/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg b/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg
deleted file mode 100644
index e69de29bb2d..00000000000
--- a/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg
+++ /dev/null