diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /container-messagebus/src |
Publish
Diffstat (limited to 'container-messagebus/src')
13 files changed, 770 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java new file mode 100644 index 00000000000..99a4960d25d --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusClientProvider.java @@ -0,0 +1,71 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc.messagebus; + +import com.google.inject.Inject; +import com.yahoo.container.di.componentgraph.Provider; +import com.yahoo.container.jdisc.config.SessionConfig; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.messagebus.AllPassThrottlePolicy; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.jdisc.MbusClient; +import com.yahoo.messagebus.shared.SharedIntermediateSession; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.messagebus.ThrottlePolicy; +import com.yahoo.messagebus.Message; + +import com.yahoo.messagebus.Reply; + +/** + * @author tonytv + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class MbusClientProvider implements Provider<MbusClient> { + + private final MbusClient client; + + private static MbusClient createSourceClient( + SessionCache sessionCache, + SessionConfig sessionConfig, + boolean setAllPassThrottlePolicy) { + final SourceSessionParams sourceSessionParams = new SourceSessionParams(); + if (setAllPassThrottlePolicy) { + sourceSessionParams.setThrottlePolicy(new AllPassThrottlePolicy()); + } + try (ReferencedResource<SharedSourceSession> ref = sessionCache.retainSource(sourceSessionParams)) { + return new MbusClient(ref.getResource()); + } + } + + @Inject + public MbusClientProvider(SessionCache sessionCache, SessionConfig sessionConfig) { + switch (sessionConfig.type()) { + case INTERMEDIATE: + final IntermediateSessionParams intermediateSessionParams = + MbusServerProvider.createIntermediateSessionParams(true, sessionConfig.name()); + try (final ReferencedResource<SharedIntermediateSession> ref = + sessionCache.retainIntermediate(intermediateSessionParams)) { + client = new MbusClient(ref.getResource()); + } + break; + case SOURCE: + client = createSourceClient(sessionCache, sessionConfig, false); + break; + case INTERNAL: + client = createSourceClient(sessionCache, sessionConfig, true); + break; + default: + throw new IllegalArgumentException("Unknown session type: " + sessionConfig.type()); + } + } + + @Override + public MbusClient get() { + return client; + } + + @Override + public void deconstruct() { + client.release(); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusServerProvider.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusServerProvider.java new file mode 100644 index 00000000000..dcd55f8f1f9 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/MbusServerProvider.java @@ -0,0 +1,55 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc.messagebus; + +import com.yahoo.component.ComponentId; +import com.yahoo.container.di.componentgraph.Provider; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.jdisc.MbusServer; +import com.yahoo.messagebus.shared.SharedIntermediateSession; + +import java.util.logging.Logger; + +/** + * TODO: Javadoc + * + * @author tonytv + */ +public class MbusServerProvider implements Provider<MbusServer> { + private static final Logger log = Logger.getLogger(MbusServerProvider.class.getName()); + + private final MbusServer server; + private final ReferencedResource<SharedIntermediateSession> sessionRef; + + public MbusServerProvider(ComponentId id, SessionCache sessionCache, CurrentContainer currentContainer) { + ComponentId chainId = id.withoutNamespace(); //TODO: this should be a config value instead. + sessionRef = sessionCache.retainIntermediate(createIntermediateSessionParams(true, chainId.stringValue())); + server = new MbusServer(currentContainer, sessionRef.getResource()); + } + + static IntermediateSessionParams createIntermediateSessionParams(boolean broadcastName, String name) { + IntermediateSessionParams intermediateParams = new IntermediateSessionParams(); + intermediateParams.setBroadcastName(broadcastName); + intermediateParams.setName(name); + return intermediateParams; + } + + public SharedIntermediateSession getSession() { + return sessionRef.getResource(); + } + + @Override + public MbusServer get() { + return server; + } + + @Override + public void deconstruct() { + log.log(LogLevel.INFO, "Deconstructing mbus server: " + server); + server.close(); + server.release(); + sessionRef.getReference().close(); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java new file mode 100644 index 00000000000..801cbbf85a0 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -0,0 +1,459 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc.messagebus; + +import com.yahoo.component.AbstractComponent; +import com.yahoo.config.subscription.ConfigGetter; +import com.yahoo.container.jdisc.ContainerMbusConfig; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentUtil; +import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.jdisc.References; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.SharedResource; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.shared.SharedDestinationSession; +import com.yahoo.messagebus.shared.SharedIntermediateSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.shared.SharedSourceSession; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +/** + * Class to encapsulate access to slobrok sessions. + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar Rosenvinge</a> + */ +public final class SessionCache extends AbstractComponent { + + private static final Logger log = Logger.getLogger(SessionCache.class.getName()); + + //config + private final String messagebusConfigId; + private final String slobrokConfigId; + private final String identity; + private final String containerMbusConfigId; + private final String documentManagerConfigId; + private final String loadTypeConfigId; + private final DocumentTypeManager documentTypeManager; + + // initialized in start() + private ConfigAgent configAgent; + private SharedMessageBus messageBus; + + private final Object intermediateLock = new Object(); + private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>(); + private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator(); + + private final Object destinationLock = new Object(); + private final Map<String, SharedDestinationSession> destinations = new HashMap<>(); + private final DestinationSessionCreator destinationsCreator = new DestinationSessionCreator(); + + private final Object sourceLock = new Object(); + private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>(); + private final SourceSessionCreator sourcesCreator = new SourceSessionCreator(); + + public SessionCache(final String messagebusConfigId, final String slobrokConfigId, final String identity, + final String containerMbusConfigId, final String documentManagerConfigId, + final String loadTypeConfigId, + final DocumentTypeManager documentTypeManager) { + this.messagebusConfigId = messagebusConfigId; + this.slobrokConfigId = slobrokConfigId; + this.identity = identity; + this.containerMbusConfigId = containerMbusConfigId; + this.documentManagerConfigId = documentManagerConfigId; + this.loadTypeConfigId = loadTypeConfigId; + this.documentTypeManager = documentTypeManager; + } + + public SessionCache(final String identity) { + this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager()); + } + + private void start() { + ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId); + if (documentManagerConfigId != null) { + documentTypeManager.configure(documentManagerConfigId); + } + LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId); + DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet); + messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol); + // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well + configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus()); + configAgent.subscribe(); + } + + private boolean isStarted() { + return messageBus != null; + } + + private static SharedMessageBus createSharedMessageBus(final ContainerMbusConfig mbusConfig, + final String slobrokConfigId, final String identity, + Protocol protocol) { + final MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); + + final int maxPendingSize = DocumentUtil + .calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(), + mbusConfig.containerCoreMemory()); + logSystemInfo(mbusConfig, maxPendingSize); + + mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount()); + mbusParams.setMaxPendingSize(maxPendingSize); + + final RPCNetworkParams netParams = new RPCNetworkParams() + .setSlobrokConfigId(slobrokConfigId) + .setIdentity(new Identity(identity)) + .setOOSServerPattern("search/cluster.*/rtx/*/clustercontroller") + .setListenPort(mbusConfig.port()); + return SharedMessageBus.newInstance(mbusParams, netParams); + } + + private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) { + log.log(LogLevel.DEBUG, + "Running with maximum heap size of " + (Runtime.getRuntime().maxMemory() / 1024L / 1024L) + " MB"); + log.log(LogLevel.CONFIG, + "Amount of memory reserved for container core: " + containerMbusConfig.containerCoreMemory() + " MB."); + log.log(LogLevel.CONFIG, + "Running with document expansion factor " + containerMbusConfig.documentExpansionFactor() + ""); + + String msgLimit = + (containerMbusConfig.maxpendingcount() == 0) ? "unlimited" : "" + containerMbusConfig.maxpendingcount(); + log.log(LogLevel.CONFIG, ("Starting message bus with max " + msgLimit + " pending messages and max " + + (((double) (maxPendingSize / 1024L)) / 1024.0d) + " pending megabytes.")); + } + + public ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) { + synchronized (this) { + if (!isStarted()) { + start(); + } + } + return intermediatesCreator.retain(intermediateLock, intermediates, p); + } + + public ReferencedResource<SharedDestinationSession> retainDestination(final DestinationSessionParams p) { + synchronized (this) { + if (!isStarted()) { + start(); + } + } + return destinationsCreator.retain(destinationLock, destinations, p); + } + + public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) { + synchronized (this) { + if (!isStarted()) { + start(); + } + } + return sourcesCreator.retain(sourceLock, sources, p); + } + + private abstract class SessionCreator<PARAMS, KEY, SESSION extends SharedResource> { + abstract SESSION create(PARAMS p); + + abstract KEY buildKey(PARAMS p); + + abstract void logReuse(SESSION session); + + ReferencedResource<SESSION> retain(final Object lock, final Map<KEY, SESSION> registry, final PARAMS p) { + SESSION session; + ResourceReference sessionReference; + final KEY key = buildKey(p); + // this lock is held for a horribly long time, but I see no way of + // making it slimmer + synchronized (lock) { + session = registry.get(key); + if (session == null) { + session = createAndStore(registry, p, key); + sessionReference = References.fromResource(session); + } else { + try { + sessionReference = session.refer(); + logReuse(session); + } catch (final IllegalStateException e) { + session = createAndStore(registry, p, key); + sessionReference = References.fromResource(session); + } + } + } + return new ReferencedResource<>(session, sessionReference); + } + + SESSION createAndStore(final Map<KEY, SESSION> registry, final PARAMS p, final KEY key) { + SESSION session = create(p); + registry.put(key, session); + return session; + } + + } + + private class DestinationSessionCreator + extends SessionCreator<DestinationSessionParams, String, SharedDestinationSession> { + @Override + SharedDestinationSession create(final DestinationSessionParams p) { + log.log(LogLevel.DEBUG, "Creating new destination session " + p.getName() + ""); + return messageBus.newDestinationSession(p); + } + + @Override + String buildKey(final DestinationSessionParams p) { + return p.getName(); + } + + @Override + void logReuse(final SharedDestinationSession session) { + log.log(LogLevel.DEBUG, "Reusing destination session " + session.name() + ""); + } + } + + private class SourceSessionCreator + extends SessionCreator<SourceSessionParams, SourceSessionKey, SharedSourceSession> { + + @Override + SharedSourceSession create(final SourceSessionParams p) { + log.log(LogLevel.DEBUG, "Creating new source session."); + return messageBus.newSourceSession(p); + } + + @Override + SourceSessionKey buildKey(final SourceSessionParams p) { + return new SourceSessionKey(p); + } + + @Override + void logReuse(final SharedSourceSession session) { + log.log(LogLevel.DEBUG, "Reusing source session."); + } + } + + private class IntermediateSessionCreator + extends SessionCreator<IntermediateSessionParams, String, SharedIntermediateSession> { + + @Override + SharedIntermediateSession create(final IntermediateSessionParams p) { + log.log(LogLevel.DEBUG, "Creating new intermediate session " + p.getName() + ""); + return messageBus.newIntermediateSession(p); + } + + @Override + String buildKey(final IntermediateSessionParams p) { + return p.getName(); + } + + @Override + void logReuse(final SharedIntermediateSession session) { + log.log(LogLevel.DEBUG, "Reusing intermediate session " + session.name() + ""); + } + } + + static class ThrottlePolicySignature { + @Override + public int hashCode() { + return getClass().hashCode(); + } + } + + static class StaticThrottlePolicySignature extends + ThrottlePolicySignature { + private final int maxPendingCount; + private final long maxPendingSize; + + StaticThrottlePolicySignature(final StaticThrottlePolicy policy) { + maxPendingCount = policy.getMaxPendingCount(); + maxPendingSize = policy.getMaxPendingSize(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + maxPendingCount; + result = prime * result + + (int) (maxPendingSize ^ (maxPendingSize >>> 32)); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (getClass() != obj.getClass()) { + return false; + } + final StaticThrottlePolicySignature other = (StaticThrottlePolicySignature) obj; + if (maxPendingCount != other.maxPendingCount) { + return false; + } + if (maxPendingSize != other.maxPendingSize) { + return false; + } + return true; + } + + } + + static class DynamicThrottlePolicySignature extends + ThrottlePolicySignature { + private final int maxPending; + private final double maxWindowSize; + private final double minWindowSize; + private final double windowSizeBackoff; + private final double windowSizeIncrement; + + DynamicThrottlePolicySignature(final DynamicThrottlePolicy policy) { + maxPending = policy.getMaxPendingCount(); + maxWindowSize = policy.getMaxWindowSize(); + minWindowSize = policy.getMinWindowSize(); + windowSizeBackoff = policy.getWindowSizeBackOff(); + windowSizeIncrement = policy.getWindowSizeIncrement(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + maxPending; + long temp; + temp = Double.doubleToLongBits(maxWindowSize); + result = prime * result + (int) (temp ^ (temp >>> 32)); + temp = Double.doubleToLongBits(minWindowSize); + result = prime * result + (int) (temp ^ (temp >>> 32)); + temp = Double.doubleToLongBits(windowSizeBackoff); + result = prime * result + (int) (temp ^ (temp >>> 32)); + temp = Double.doubleToLongBits(windowSizeIncrement); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (getClass() != obj.getClass()) { + return false; + } + final DynamicThrottlePolicySignature other = (DynamicThrottlePolicySignature) obj; + if (maxPending != other.maxPending) { + return false; + } + if (Double.doubleToLongBits(maxWindowSize) != Double + .doubleToLongBits(other.maxWindowSize)) { + return false; + } + if (Double.doubleToLongBits(minWindowSize) != Double + .doubleToLongBits(other.minWindowSize)) { + return false; + } + if (Double.doubleToLongBits(windowSizeBackoff) != Double + .doubleToLongBits(other.windowSizeBackoff)) { + return false; + } + if (Double.doubleToLongBits(windowSizeIncrement) != Double + .doubleToLongBits(other.windowSizeIncrement)) { + return false; + } + return true; + } + + } + + static class UnknownThrottlePolicySignature extends + ThrottlePolicySignature { + private final ThrottlePolicy policy; + + UnknownThrottlePolicySignature(final ThrottlePolicy policy) { + this.policy = policy; + } + + @Override + public boolean equals(final Object other) { + if (other == null) { + return false; + } + if (other.getClass() != getClass()) { + return false; + } + return ((UnknownThrottlePolicySignature) other).policy == policy; + } + } + + static class SourceSessionKey { + private final double timeout; + private final ThrottlePolicySignature policy; + + SourceSessionKey(final SourceSessionParams p) { + timeout = p.getTimeout(); + policy = createSignature(p.getThrottlePolicy()); + } + + private static ThrottlePolicySignature createSignature( + final ThrottlePolicy policy) { + final Class<?> policyClass = policy.getClass(); + if (policyClass == DynamicThrottlePolicy.class) { + return new DynamicThrottlePolicySignature( + (DynamicThrottlePolicy) policy); + } else if (policyClass == StaticThrottlePolicy.class) { + return new StaticThrottlePolicySignature( + (StaticThrottlePolicy) policy); + } else { + return new UnknownThrottlePolicySignature(policy); + } + } + + @Override + public String toString() { + return "SourceSessionKey{" + + "timeout=" + timeout + + ", policy=" + policy + + '}'; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((policy == null) ? 0 : policy.hashCode()); + long temp; + temp = Double.doubleToLongBits(timeout); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final SourceSessionKey other = (SourceSessionKey) obj; + if (policy == null) { + if (other.policy != null) { + return false; + } + } else if (!policy.equals(other.policy)) { + return false; + } + if (Double.doubleToLongBits(timeout) != Double + .doubleToLongBits(other.timeout)) { + return false; + } + return true; + } + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/package-info.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/package-info.java new file mode 100644 index 00000000000..b7ddc989460 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/package-info.java @@ -0,0 +1,8 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * TODO + */ +@ExportPackage +package com.yahoo.container.jdisc.messagebus; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-messagebus/src/main/resources/configdefinitions/container-mbus.def b/container-messagebus/src/main/resources/configdefinitions/container-mbus.def new file mode 100644 index 00000000000..7c6d1a1d1d3 --- /dev/null +++ b/container-messagebus/src/main/resources/configdefinitions/container-mbus.def @@ -0,0 +1,20 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +version=2 +namespace=container.jdisc + +#settings for message bus in container +enabled bool default=false +port int default=0 +maxpendingcount int default=2048 +#maxpendingsize is set in megabytes! +maxpendingsize int default=100 + +#The amount of input data that the service can process concurrently +maxConcurrentFactor double default=0.2 range=[0.0-1.0] + +#The factor that an operation grows by in terms of temporary memory usage during deserialization and processing +documentExpansionFactor double default=80.0 + +#The headroom left for the container and other stuff, i.e. heap that cannot be used for processing (megabytes) +containerCoreMemory int default=150 + diff --git a/container-messagebus/src/main/resources/configdefinitions/session.def b/container-messagebus/src/main/resources/configdefinitions/session.def new file mode 100644 index 00000000000..c4ba5cb82f5 --- /dev/null +++ b/container-messagebus/src/main/resources/configdefinitions/session.def @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +version=1 + +namespace=container.jdisc.config + +name string default="" +type enum {INTERMEDIATE, SOURCE, INTERNAL} default=INTERMEDIATE diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java new file mode 100644 index 00000000000..86ad759e2db --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusClientProviderTest.java @@ -0,0 +1,37 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc.messagebus; + +import com.yahoo.container.jdisc.config.SessionConfig; +import com.yahoo.container.jdisc.messagebus.MbusClientProvider; +import com.yahoo.container.jdisc.messagebus.SessionCache; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; + +/** + * @author lulf + * @since 5.1 + */ +public class MbusClientProviderTest { + @Test + public void testIntermediateClient() { + SessionConfig.Builder builder = new SessionConfig.Builder(); + builder.name("foo"); + builder.type(SessionConfig.Type.Enum.INTERMEDIATE); + testClient(new SessionConfig(builder)); + } + + @Test + public void testSourceClient() { + SessionConfig.Builder builder = new SessionConfig.Builder(); + builder.name("foo"); + builder.type(SessionConfig.Type.Enum.SOURCE); + testClient(new SessionConfig(builder)); + } + + private void testClient(SessionConfig config) { + MbusClientProvider p = new MbusClientProvider(new SessionCache("dir:src/test/resources/config/clientprovider"), config); + assertNotNull(p.get()); + p.deconstruct(); + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java new file mode 100644 index 00000000000..05336e0416b --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/container/jdisc/messagebus/MbusSessionKeyTestCase.java @@ -0,0 +1,113 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.container.jdisc.messagebus; + +import com.yahoo.container.jdisc.messagebus.SessionCache.DynamicThrottlePolicySignature; +import com.yahoo.container.jdisc.messagebus.SessionCache.SourceSessionKey; +import com.yahoo.container.jdisc.messagebus.SessionCache.StaticThrottlePolicySignature; +import com.yahoo.container.jdisc.messagebus.SessionCache.UnknownThrottlePolicySignature; +import com.yahoo.messagebus.DynamicThrottlePolicy; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.StaticThrottlePolicy; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Check the completeness of the mbus session key classes. + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public class MbusSessionKeyTestCase { + + @Test + public final void staticThrottlePolicySignature() { + final StaticThrottlePolicy base = new StaticThrottlePolicy(); + final StaticThrottlePolicy other = new StaticThrottlePolicy(); + other.setMaxPendingCount(500); + other.setMaxPendingSize(500 * 1000 * 1000); + base.setMaxPendingCount(1); + base.setMaxPendingSize(1000); + final StaticThrottlePolicySignature sigBase = new StaticThrottlePolicySignature( + base); + final StaticThrottlePolicySignature sigOther = new StaticThrottlePolicySignature( + other); + assertFalse("The policies are different, but signatures are equal.", + sigBase.equals(sigOther)); + assertTrue("Sigs created from same policy evaluated as different.", + sigBase.equals(new StaticThrottlePolicySignature(base))); + other.setMaxPendingCount(1); + other.setMaxPendingSize(1000); + assertTrue( + "Sigs created from different policies with same settings evaluated as different.", + sigBase.equals(new StaticThrottlePolicySignature(other))); + + } + + @Test + public final void dynamicThrottlePolicySignature() { + final DynamicThrottlePolicy base = new DynamicThrottlePolicy(); + final DynamicThrottlePolicy other = new DynamicThrottlePolicy(); + base.setEfficiencyThreshold(5); + base.setMaxPendingCount(3); + base.setMaxPendingSize(3 * 100); + base.setMaxThroughput(1e6); + base.setMaxWindowSize(1e9); + base.setMinWindowSize(1e5); + base.setWeight(1.0); + base.setWindowSizeBackOff(.6); + base.setWindowSizeIncrement(500); + other.setEfficiencyThreshold(5 + 1); + other.setMaxPendingCount(3 + 1); + other.setMaxPendingSize(3 * 100 + 1); + other.setMaxThroughput(1e6 + 1); + other.setMaxWindowSize(1e9 + 1); + other.setMinWindowSize(1e5 + 1); + other.setWeight(1.0 + 1); + other.setWindowSizeBackOff(.6 + 1); + other.setWindowSizeIncrement(500 + 1); + final DynamicThrottlePolicySignature sigBase = new DynamicThrottlePolicySignature( + base); + final DynamicThrottlePolicySignature sigOther = new DynamicThrottlePolicySignature( + other); + assertFalse("The policies are different, but signatures are equal.", + sigBase.equals(sigOther)); + assertTrue("Sigs created from same policy evaluated as different.", + sigBase.equals(new DynamicThrottlePolicySignature(base))); + other.setEfficiencyThreshold(5); + other.setMaxPendingCount(3); + other.setMaxPendingSize(3 * 100); + other.setMaxThroughput(1e6); + other.setMaxWindowSize(1e9); + other.setMinWindowSize(1e5); + other.setWeight(1.0); + other.setWindowSizeBackOff(.6); + other.setWindowSizeIncrement(500); + assertTrue( + "Sigs created from different policies with same settings evaluated as different.", + sigBase.equals(new DynamicThrottlePolicySignature(other))); + } + + @Test + public final void unknownThrottlePolicySignature() { + final UnknownThrottlePolicySignature baseSig = new UnknownThrottlePolicySignature( + new StaticThrottlePolicy()); + final UnknownThrottlePolicySignature otherSig = new UnknownThrottlePolicySignature( + new StaticThrottlePolicy()); + assertEquals(baseSig, baseSig); + assertFalse(otherSig.equals(baseSig)); + } + + // TODO the session key tests are just smoke tests + + @Test + public final void sourceSessionKey() { + final SourceSessionParams base = new SourceSessionParams(); + final SourceSessionParams other = new SourceSessionParams(); + assertEquals(new SourceSessionKey(base), new SourceSessionKey(other)); + other.setTimeout(other.getTimeout() + 1); + assertFalse(new SourceSessionKey(base).equals(new SourceSessionKey( + other))); + } +} diff --git a/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg b/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/container-messagebus/src/test/resources/config/clientprovider/container-mbus.cfg diff --git a/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg b/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/container-messagebus/src/test/resources/config/clientprovider/documentmanager.cfg diff --git a/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg b/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/container-messagebus/src/test/resources/config/clientprovider/load-type.cfg diff --git a/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg b/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/container-messagebus/src/test/resources/config/clientprovider/messagebus.cfg diff --git a/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg b/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/container-messagebus/src/test/resources/config/clientprovider/slobroks.cfg |