aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /container-messagebus
Publish
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/.gitignore2
-rw-r--r--container-messagebus/OWNERS1
-rw-r--r--container-messagebus/pom.xml81
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java71
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusServerProvider.java55
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java459
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/package-info.java8
-rw-r--r--container-messagebus/src/main/resources/configdefinitions/container-mbus.def20
-rw-r--r--container-messagebus/src/main/resources/configdefinitions/session.def7
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java37
-rw-r--r--container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java113
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/load-type.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg0
-rw-r--r--container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg0
16 files changed, 854 insertions, 0 deletions
diff --git a/container-messagebus/.gitignore b/container-messagebus/.gitignore
new file mode 100644
index 00000000000..3cc25b51fc4
--- /dev/null
+++ b/container-messagebus/.gitignore
@@ -0,0 +1,2 @@
+/pom.xml.build
+/target
diff --git a/container-messagebus/OWNERS b/container-messagebus/OWNERS
new file mode 100644
index 00000000000..90fdb511ae3
--- /dev/null
+++ b/container-messagebus/OWNERS
@@ -0,0 +1 @@
+bakksjo
diff --git a/container-messagebus/pom.xml b/container-messagebus/pom.xml
new file mode 100644
index 00000000000..553f3bc0d8a
--- /dev/null
+++ b/container-messagebus/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>parent</artifactId>
+ <version>6-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <artifactId>container-messagebus</artifactId>
+ <version>6-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>provided-dependencies</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>component</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>container-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>messagebus-disc</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>documentapi</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-class-plugin</artifactId>
+ <version>${project.version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>config-gen</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java
new file mode 100644
index 00000000000..99a4960d25d
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java
@@ -0,0 +1,71 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.google.inject.Inject;
+import com.yahoo.container.di.componentgraph.Provider;
+import com.yahoo.container.jdisc.config.SessionConfig;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.messagebus.AllPassThrottlePolicy;
+import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.jdisc.MbusClient;
+import com.yahoo.messagebus.shared.SharedIntermediateSession;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.messagebus.ThrottlePolicy;
+import com.yahoo.messagebus.Message;
+
+import com.yahoo.messagebus.Reply;
+
+/**
+ * @author tonytv
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class MbusClientProvider implements Provider<MbusClient> {
+
+ private final MbusClient client;
+
+ private static MbusClient createSourceClient(
+ SessionCache sessionCache,
+ SessionConfig sessionConfig,
+ boolean setAllPassThrottlePolicy) {
+ final SourceSessionParams sourceSessionParams = new SourceSessionParams();
+ if (setAllPassThrottlePolicy) {
+ sourceSessionParams.setThrottlePolicy(new AllPassThrottlePolicy());
+ }
+ try (ReferencedResource<SharedSourceSession> ref = sessionCache.retainSource(sourceSessionParams)) {
+ return new MbusClient(ref.getResource());
+ }
+ }
+
+ @Inject
+ public MbusClientProvider(SessionCache sessionCache, SessionConfig sessionConfig) {
+ switch (sessionConfig.type()) {
+ case INTERMEDIATE:
+ final IntermediateSessionParams intermediateSessionParams =
+ MbusServerProvider.createIntermediateSessionParams(true, sessionConfig.name());
+ try (final ReferencedResource<SharedIntermediateSession> ref =
+ sessionCache.retainIntermediate(intermediateSessionParams)) {
+ client = new MbusClient(ref.getResource());
+ }
+ break;
+ case SOURCE:
+ client = createSourceClient(sessionCache, sessionConfig, false);
+ break;
+ case INTERNAL:
+ client = createSourceClient(sessionCache, sessionConfig, true);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown session type: " + sessionConfig.type());
+ }
+ }
+
+ @Override
+ public MbusClient get() {
+ return client;
+ }
+
+ @Override
+ public void deconstruct() {
+ client.release();
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusServerProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusServerProvider.java
new file mode 100644
index 00000000000..dcd55f8f1f9
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusServerProvider.java
@@ -0,0 +1,55 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.container.di.componentgraph.Provider;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.jdisc.service.CurrentContainer;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.jdisc.MbusServer;
+import com.yahoo.messagebus.shared.SharedIntermediateSession;
+
+import java.util.logging.Logger;
+
+/**
+ * TODO: Javadoc
+ *
+ * @author tonytv
+ */
+public class MbusServerProvider implements Provider<MbusServer> {
+ private static final Logger log = Logger.getLogger(MbusServerProvider.class.getName());
+
+ private final MbusServer server;
+ private final ReferencedResource<SharedIntermediateSession> sessionRef;
+
+ public MbusServerProvider(ComponentId id, SessionCache sessionCache, CurrentContainer currentContainer) {
+ ComponentId chainId = id.withoutNamespace(); //TODO: this should be a config value instead.
+ sessionRef = sessionCache.retainIntermediate(createIntermediateSessionParams(true, chainId.stringValue()));
+ server = new MbusServer(currentContainer, sessionRef.getResource());
+ }
+
+ static IntermediateSessionParams createIntermediateSessionParams(boolean broadcastName, String name) {
+ IntermediateSessionParams intermediateParams = new IntermediateSessionParams();
+ intermediateParams.setBroadcastName(broadcastName);
+ intermediateParams.setName(name);
+ return intermediateParams;
+ }
+
+ public SharedIntermediateSession getSession() {
+ return sessionRef.getResource();
+ }
+
+ @Override
+ public MbusServer get() {
+ return server;
+ }
+
+ @Override
+ public void deconstruct() {
+ log.log(LogLevel.INFO, "Deconstructing mbus server: " + server);
+ server.close();
+ server.release();
+ sessionRef.getReference().close();
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
new file mode 100644
index 00000000000..801cbbf85a0
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -0,0 +1,459 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.config.subscription.ConfigGetter;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUtil;
+import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.jdisc.References;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.SharedResource;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.shared.SharedDestinationSession;
+import com.yahoo.messagebus.shared.SharedIntermediateSession;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+/**
+ * Class to encapsulate access to slobrok sessions.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar Rosenvinge</a>
+ */
+public final class SessionCache extends AbstractComponent {
+
+ private static final Logger log = Logger.getLogger(SessionCache.class.getName());
+
+ //config
+ private final String messagebusConfigId;
+ private final String slobrokConfigId;
+ private final String identity;
+ private final String containerMbusConfigId;
+ private final String documentManagerConfigId;
+ private final String loadTypeConfigId;
+ private final DocumentTypeManager documentTypeManager;
+
+ // initialized in start()
+ private ConfigAgent configAgent;
+ private SharedMessageBus messageBus;
+
+ private final Object intermediateLock = new Object();
+ private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
+ private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
+
+ private final Object destinationLock = new Object();
+ private final Map<String, SharedDestinationSession> destinations = new HashMap<>();
+ private final DestinationSessionCreator destinationsCreator = new DestinationSessionCreator();
+
+ private final Object sourceLock = new Object();
+ private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
+ private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
+
+ public SessionCache(final String messagebusConfigId, final String slobrokConfigId, final String identity,
+ final String containerMbusConfigId, final String documentManagerConfigId,
+ final String loadTypeConfigId,
+ final DocumentTypeManager documentTypeManager) {
+ this.messagebusConfigId = messagebusConfigId;
+ this.slobrokConfigId = slobrokConfigId;
+ this.identity = identity;
+ this.containerMbusConfigId = containerMbusConfigId;
+ this.documentManagerConfigId = documentManagerConfigId;
+ this.loadTypeConfigId = loadTypeConfigId;
+ this.documentTypeManager = documentTypeManager;
+ }
+
+ public SessionCache(final String identity) {
+ this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
+ }
+
+ private void start() {
+ ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
+ if (documentManagerConfigId != null) {
+ documentTypeManager.configure(documentManagerConfigId);
+ }
+ LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId);
+ DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet);
+ messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol);
+ // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well
+ configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus());
+ configAgent.subscribe();
+ }
+
+ private boolean isStarted() {
+ return messageBus != null;
+ }
+
+ private static SharedMessageBus createSharedMessageBus(final ContainerMbusConfig mbusConfig,
+ final String slobrokConfigId, final String identity,
+ Protocol protocol) {
+ final MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+
+ final int maxPendingSize = DocumentUtil
+ .calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(),
+ mbusConfig.containerCoreMemory());
+ logSystemInfo(mbusConfig, maxPendingSize);
+
+ mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount());
+ mbusParams.setMaxPendingSize(maxPendingSize);
+
+ final RPCNetworkParams netParams = new RPCNetworkParams()
+ .setSlobrokConfigId(slobrokConfigId)
+ .setIdentity(new Identity(identity))
+ .setOOSServerPattern("search/cluster.*/rtx/*/clustercontroller")
+ .setListenPort(mbusConfig.port());
+ return SharedMessageBus.newInstance(mbusParams, netParams);
+ }
+
+ private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) {
+ log.log(LogLevel.DEBUG,
+ "Running with maximum heap size of " + (Runtime.getRuntime().maxMemory() / 1024L / 1024L) + " MB");
+ log.log(LogLevel.CONFIG,
+ "Amount of memory reserved for container core: " + containerMbusConfig.containerCoreMemory() + " MB.");
+ log.log(LogLevel.CONFIG,
+ "Running with document expansion factor " + containerMbusConfig.documentExpansionFactor() + "");
+
+ String msgLimit =
+ (containerMbusConfig.maxpendingcount() == 0) ? "unlimited" : "" + containerMbusConfig.maxpendingcount();
+ log.log(LogLevel.CONFIG, ("Starting message bus with max " + msgLimit + " pending messages and max " +
+ (((double) (maxPendingSize / 1024L)) / 1024.0d) + " pending megabytes."));
+ }
+
+ public ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
+ return intermediatesCreator.retain(intermediateLock, intermediates, p);
+ }
+
+ public ReferencedResource<SharedDestinationSession> retainDestination(final DestinationSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
+ return destinationsCreator.retain(destinationLock, destinations, p);
+ }
+
+ public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
+ return sourcesCreator.retain(sourceLock, sources, p);
+ }
+
+ private abstract class SessionCreator<PARAMS, KEY, SESSION extends SharedResource> {
+ abstract SESSION create(PARAMS p);
+
+ abstract KEY buildKey(PARAMS p);
+
+ abstract void logReuse(SESSION session);
+
+ ReferencedResource<SESSION> retain(final Object lock, final Map<KEY, SESSION> registry, final PARAMS p) {
+ SESSION session;
+ ResourceReference sessionReference;
+ final KEY key = buildKey(p);
+ // this lock is held for a horribly long time, but I see no way of
+ // making it slimmer
+ synchronized (lock) {
+ session = registry.get(key);
+ if (session == null) {
+ session = createAndStore(registry, p, key);
+ sessionReference = References.fromResource(session);
+ } else {
+ try {
+ sessionReference = session.refer();
+ logReuse(session);
+ } catch (final IllegalStateException e) {
+ session = createAndStore(registry, p, key);
+ sessionReference = References.fromResource(session);
+ }
+ }
+ }
+ return new ReferencedResource<>(session, sessionReference);
+ }
+
+ SESSION createAndStore(final Map<KEY, SESSION> registry, final PARAMS p, final KEY key) {
+ SESSION session = create(p);
+ registry.put(key, session);
+ return session;
+ }
+
+ }
+
+ private class DestinationSessionCreator
+ extends SessionCreator<DestinationSessionParams, String, SharedDestinationSession> {
+ @Override
+ SharedDestinationSession create(final DestinationSessionParams p) {
+ log.log(LogLevel.DEBUG, "Creating new destination session " + p.getName() + "");
+ return messageBus.newDestinationSession(p);
+ }
+
+ @Override
+ String buildKey(final DestinationSessionParams p) {
+ return p.getName();
+ }
+
+ @Override
+ void logReuse(final SharedDestinationSession session) {
+ log.log(LogLevel.DEBUG, "Reusing destination session " + session.name() + "");
+ }
+ }
+
+ private class SourceSessionCreator
+ extends SessionCreator<SourceSessionParams, SourceSessionKey, SharedSourceSession> {
+
+ @Override
+ SharedSourceSession create(final SourceSessionParams p) {
+ log.log(LogLevel.DEBUG, "Creating new source session.");
+ return messageBus.newSourceSession(p);
+ }
+
+ @Override
+ SourceSessionKey buildKey(final SourceSessionParams p) {
+ return new SourceSessionKey(p);
+ }
+
+ @Override
+ void logReuse(final SharedSourceSession session) {
+ log.log(LogLevel.DEBUG, "Reusing source session.");
+ }
+ }
+
+ private class IntermediateSessionCreator
+ extends SessionCreator<IntermediateSessionParams, String, SharedIntermediateSession> {
+
+ @Override
+ SharedIntermediateSession create(final IntermediateSessionParams p) {
+ log.log(LogLevel.DEBUG, "Creating new intermediate session " + p.getName() + "");
+ return messageBus.newIntermediateSession(p);
+ }
+
+ @Override
+ String buildKey(final IntermediateSessionParams p) {
+ return p.getName();
+ }
+
+ @Override
+ void logReuse(final SharedIntermediateSession session) {
+ log.log(LogLevel.DEBUG, "Reusing intermediate session " + session.name() + "");
+ }
+ }
+
+ static class ThrottlePolicySignature {
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+
+ static class StaticThrottlePolicySignature extends
+ ThrottlePolicySignature {
+ private final int maxPendingCount;
+ private final long maxPendingSize;
+
+ StaticThrottlePolicySignature(final StaticThrottlePolicy policy) {
+ maxPendingCount = policy.getMaxPendingCount();
+ maxPendingSize = policy.getMaxPendingSize();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + maxPendingCount;
+ result = prime * result
+ + (int) (maxPendingSize ^ (maxPendingSize >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final StaticThrottlePolicySignature other = (StaticThrottlePolicySignature) obj;
+ if (maxPendingCount != other.maxPendingCount) {
+ return false;
+ }
+ if (maxPendingSize != other.maxPendingSize) {
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ static class DynamicThrottlePolicySignature extends
+ ThrottlePolicySignature {
+ private final int maxPending;
+ private final double maxWindowSize;
+ private final double minWindowSize;
+ private final double windowSizeBackoff;
+ private final double windowSizeIncrement;
+
+ DynamicThrottlePolicySignature(final DynamicThrottlePolicy policy) {
+ maxPending = policy.getMaxPendingCount();
+ maxWindowSize = policy.getMaxWindowSize();
+ minWindowSize = policy.getMinWindowSize();
+ windowSizeBackoff = policy.getWindowSizeBackOff();
+ windowSizeIncrement = policy.getWindowSizeIncrement();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + maxPending;
+ long temp;
+ temp = Double.doubleToLongBits(maxWindowSize);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(minWindowSize);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(windowSizeBackoff);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(windowSizeIncrement);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final DynamicThrottlePolicySignature other = (DynamicThrottlePolicySignature) obj;
+ if (maxPending != other.maxPending) {
+ return false;
+ }
+ if (Double.doubleToLongBits(maxWindowSize) != Double
+ .doubleToLongBits(other.maxWindowSize)) {
+ return false;
+ }
+ if (Double.doubleToLongBits(minWindowSize) != Double
+ .doubleToLongBits(other.minWindowSize)) {
+ return false;
+ }
+ if (Double.doubleToLongBits(windowSizeBackoff) != Double
+ .doubleToLongBits(other.windowSizeBackoff)) {
+ return false;
+ }
+ if (Double.doubleToLongBits(windowSizeIncrement) != Double
+ .doubleToLongBits(other.windowSizeIncrement)) {
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ static class UnknownThrottlePolicySignature extends
+ ThrottlePolicySignature {
+ private final ThrottlePolicy policy;
+
+ UnknownThrottlePolicySignature(final ThrottlePolicy policy) {
+ this.policy = policy;
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass() != getClass()) {
+ return false;
+ }
+ return ((UnknownThrottlePolicySignature) other).policy == policy;
+ }
+ }
+
+ static class SourceSessionKey {
+ private final double timeout;
+ private final ThrottlePolicySignature policy;
+
+ SourceSessionKey(final SourceSessionParams p) {
+ timeout = p.getTimeout();
+ policy = createSignature(p.getThrottlePolicy());
+ }
+
+ private static ThrottlePolicySignature createSignature(
+ final ThrottlePolicy policy) {
+ final Class<?> policyClass = policy.getClass();
+ if (policyClass == DynamicThrottlePolicy.class) {
+ return new DynamicThrottlePolicySignature(
+ (DynamicThrottlePolicy) policy);
+ } else if (policyClass == StaticThrottlePolicy.class) {
+ return new StaticThrottlePolicySignature(
+ (StaticThrottlePolicy) policy);
+ } else {
+ return new UnknownThrottlePolicySignature(policy);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SourceSessionKey{" +
+ "timeout=" + timeout +
+ ", policy=" + policy +
+ '}';
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((policy == null) ? 0 : policy.hashCode());
+ long temp;
+ temp = Double.doubleToLongBits(timeout);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final SourceSessionKey other = (SourceSessionKey) obj;
+ if (policy == null) {
+ if (other.policy != null) {
+ return false;
+ }
+ } else if (!policy.equals(other.policy)) {
+ return false;
+ }
+ if (Double.doubleToLongBits(timeout) != Double
+ .doubleToLongBits(other.timeout)) {
+ return false;
+ }
+ return true;
+ }
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/package-info.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/package-info.java
new file mode 100644
index 00000000000..b7ddc989460
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/package-info.java
@@ -0,0 +1,8 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * TODO
+ */
+@ExportPackage
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-messagebus/src/main/resources/configdefinitions/container-mbus.def b/container-messagebus/src/main/resources/configdefinitions/container-mbus.def
new file mode 100644
index 00000000000..7c6d1a1d1d3
--- /dev/null
+++ b/container-messagebus/src/main/resources/configdefinitions/container-mbus.def
@@ -0,0 +1,20 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+version=2
+namespace=container.jdisc
+
+#settings for message bus in container
+enabled bool default=false
+port int default=0
+maxpendingcount int default=2048
+#maxpendingsize is set in megabytes!
+maxpendingsize int default=100
+
+#The amount of input data that the service can process concurrently
+maxConcurrentFactor double default=0.2 range=[0.0-1.0]
+
+#The factor that an operation grows by in terms of temporary memory usage during deserialization and processing
+documentExpansionFactor double default=80.0
+
+#The headroom left for the container and other stuff, i.e. heap that cannot be used for processing (megabytes)
+containerCoreMemory int default=150
+
diff --git a/container-messagebus/src/main/resources/configdefinitions/session.def b/container-messagebus/src/main/resources/configdefinitions/session.def
new file mode 100644
index 00000000000..c4ba5cb82f5
--- /dev/null
+++ b/container-messagebus/src/main/resources/configdefinitions/session.def
@@ -0,0 +1,7 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+version=1
+
+namespace=container.jdisc.config
+
+name string default=""
+type enum {INTERMEDIATE, SOURCE, INTERNAL} default=INTERMEDIATE
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
new file mode 100644
index 00000000000..86ad759e2db
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java
@@ -0,0 +1,37 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.container.jdisc.config.SessionConfig;
+import com.yahoo.container.jdisc.messagebus.MbusClientProvider;
+import com.yahoo.container.jdisc.messagebus.SessionCache;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * @author lulf
+ * @since 5.1
+ */
+public class MbusClientProviderTest {
+ @Test
+ public void testIntermediateClient() {
+ SessionConfig.Builder builder = new SessionConfig.Builder();
+ builder.name("foo");
+ builder.type(SessionConfig.Type.Enum.INTERMEDIATE);
+ testClient(new SessionConfig(builder));
+ }
+
+ @Test
+ public void testSourceClient() {
+ SessionConfig.Builder builder = new SessionConfig.Builder();
+ builder.name("foo");
+ builder.type(SessionConfig.Type.Enum.SOURCE);
+ testClient(new SessionConfig(builder));
+ }
+
+ private void testClient(SessionConfig config) {
+ MbusClientProvider p = new MbusClientProvider(new SessionCache("dir:src/test/resources/config/clientprovider"), config);
+ assertNotNull(p.get());
+ p.deconstruct();
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
new file mode 100644
index 00000000000..05336e0416b
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java
@@ -0,0 +1,113 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.container.jdisc.messagebus.SessionCache.DynamicThrottlePolicySignature;
+import com.yahoo.container.jdisc.messagebus.SessionCache.SourceSessionKey;
+import com.yahoo.container.jdisc.messagebus.SessionCache.StaticThrottlePolicySignature;
+import com.yahoo.container.jdisc.messagebus.SessionCache.UnknownThrottlePolicySignature;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Check the completeness of the mbus session key classes.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class MbusSessionKeyTestCase {
+
+ @Test
+ public final void staticThrottlePolicySignature() {
+ final StaticThrottlePolicy base = new StaticThrottlePolicy();
+ final StaticThrottlePolicy other = new StaticThrottlePolicy();
+ other.setMaxPendingCount(500);
+ other.setMaxPendingSize(500 * 1000 * 1000);
+ base.setMaxPendingCount(1);
+ base.setMaxPendingSize(1000);
+ final StaticThrottlePolicySignature sigBase = new StaticThrottlePolicySignature(
+ base);
+ final StaticThrottlePolicySignature sigOther = new StaticThrottlePolicySignature(
+ other);
+ assertFalse("The policies are different, but signatures are equal.",
+ sigBase.equals(sigOther));
+ assertTrue("Sigs created from same policy evaluated as different.",
+ sigBase.equals(new StaticThrottlePolicySignature(base)));
+ other.setMaxPendingCount(1);
+ other.setMaxPendingSize(1000);
+ assertTrue(
+ "Sigs created from different policies with same settings evaluated as different.",
+ sigBase.equals(new StaticThrottlePolicySignature(other)));
+
+ }
+
+ @Test
+ public final void dynamicThrottlePolicySignature() {
+ final DynamicThrottlePolicy base = new DynamicThrottlePolicy();
+ final DynamicThrottlePolicy other = new DynamicThrottlePolicy();
+ base.setEfficiencyThreshold(5);
+ base.setMaxPendingCount(3);
+ base.setMaxPendingSize(3 * 100);
+ base.setMaxThroughput(1e6);
+ base.setMaxWindowSize(1e9);
+ base.setMinWindowSize(1e5);
+ base.setWeight(1.0);
+ base.setWindowSizeBackOff(.6);
+ base.setWindowSizeIncrement(500);
+ other.setEfficiencyThreshold(5 + 1);
+ other.setMaxPendingCount(3 + 1);
+ other.setMaxPendingSize(3 * 100 + 1);
+ other.setMaxThroughput(1e6 + 1);
+ other.setMaxWindowSize(1e9 + 1);
+ other.setMinWindowSize(1e5 + 1);
+ other.setWeight(1.0 + 1);
+ other.setWindowSizeBackOff(.6 + 1);
+ other.setWindowSizeIncrement(500 + 1);
+ final DynamicThrottlePolicySignature sigBase = new DynamicThrottlePolicySignature(
+ base);
+ final DynamicThrottlePolicySignature sigOther = new DynamicThrottlePolicySignature(
+ other);
+ assertFalse("The policies are different, but signatures are equal.",
+ sigBase.equals(sigOther));
+ assertTrue("Sigs created from same policy evaluated as different.",
+ sigBase.equals(new DynamicThrottlePolicySignature(base)));
+ other.setEfficiencyThreshold(5);
+ other.setMaxPendingCount(3);
+ other.setMaxPendingSize(3 * 100);
+ other.setMaxThroughput(1e6);
+ other.setMaxWindowSize(1e9);
+ other.setMinWindowSize(1e5);
+ other.setWeight(1.0);
+ other.setWindowSizeBackOff(.6);
+ other.setWindowSizeIncrement(500);
+ assertTrue(
+ "Sigs created from different policies with same settings evaluated as different.",
+ sigBase.equals(new DynamicThrottlePolicySignature(other)));
+ }
+
+ @Test
+ public final void unknownThrottlePolicySignature() {
+ final UnknownThrottlePolicySignature baseSig = new UnknownThrottlePolicySignature(
+ new StaticThrottlePolicy());
+ final UnknownThrottlePolicySignature otherSig = new UnknownThrottlePolicySignature(
+ new StaticThrottlePolicy());
+ assertEquals(baseSig, baseSig);
+ assertFalse(otherSig.equals(baseSig));
+ }
+
+ // TODO the session key tests are just smoke tests
+
+ @Test
+ public final void sourceSessionKey() {
+ final SourceSessionParams base = new SourceSessionParams();
+ final SourceSessionParams other = new SourceSessionParams();
+ assertEquals(new SourceSessionKey(base), new SourceSessionKey(other));
+ other.setTimeout(other.getTimeout() + 1);
+ assertFalse(new SourceSessionKey(base).equals(new SourceSessionKey(
+ other)));
+ }
+}
diff --git a/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg b/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg
diff --git a/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg b/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg
diff --git a/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg b/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg
diff --git a/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg b/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg
diff --git a/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg b/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg