summaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java459
1 files changed, 459 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
new file mode 100644
index 00000000000..801cbbf85a0
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -0,0 +1,459 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.container.jdisc.messagebus;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.config.subscription.ConfigGetter;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUtil;
+import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.jdisc.References;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.SharedResource;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.shared.SharedDestinationSession;
+import com.yahoo.messagebus.shared.SharedIntermediateSession;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+/**
+ * Class to encapsulate access to slobrok sessions.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar Rosenvinge</a>
+ */
+public final class SessionCache extends AbstractComponent {
+
+ private static final Logger log = Logger.getLogger(SessionCache.class.getName());
+
+ //config
+ private final String messagebusConfigId;
+ private final String slobrokConfigId;
+ private final String identity;
+ private final String containerMbusConfigId;
+ private final String documentManagerConfigId;
+ private final String loadTypeConfigId;
+ private final DocumentTypeManager documentTypeManager;
+
+ // initialized in start()
+ private ConfigAgent configAgent;
+ private SharedMessageBus messageBus;
+
+ private final Object intermediateLock = new Object();
+ private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
+ private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
+
+ private final Object destinationLock = new Object();
+ private final Map<String, SharedDestinationSession> destinations = new HashMap<>();
+ private final DestinationSessionCreator destinationsCreator = new DestinationSessionCreator();
+
+ private final Object sourceLock = new Object();
+ private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
+ private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
+
+ public SessionCache(final String messagebusConfigId, final String slobrokConfigId, final String identity,
+ final String containerMbusConfigId, final String documentManagerConfigId,
+ final String loadTypeConfigId,
+ final DocumentTypeManager documentTypeManager) {
+ this.messagebusConfigId = messagebusConfigId;
+ this.slobrokConfigId = slobrokConfigId;
+ this.identity = identity;
+ this.containerMbusConfigId = containerMbusConfigId;
+ this.documentManagerConfigId = documentManagerConfigId;
+ this.loadTypeConfigId = loadTypeConfigId;
+ this.documentTypeManager = documentTypeManager;
+ }
+
+ public SessionCache(final String identity) {
+ this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
+ }
+
+ private void start() {
+ ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
+ if (documentManagerConfigId != null) {
+ documentTypeManager.configure(documentManagerConfigId);
+ }
+ LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId);
+ DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet);
+ messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol);
+ // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well
+ configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus());
+ configAgent.subscribe();
+ }
+
+ private boolean isStarted() {
+ return messageBus != null;
+ }
+
+ private static SharedMessageBus createSharedMessageBus(final ContainerMbusConfig mbusConfig,
+ final String slobrokConfigId, final String identity,
+ Protocol protocol) {
+ final MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+
+ final int maxPendingSize = DocumentUtil
+ .calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(),
+ mbusConfig.containerCoreMemory());
+ logSystemInfo(mbusConfig, maxPendingSize);
+
+ mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount());
+ mbusParams.setMaxPendingSize(maxPendingSize);
+
+ final RPCNetworkParams netParams = new RPCNetworkParams()
+ .setSlobrokConfigId(slobrokConfigId)
+ .setIdentity(new Identity(identity))
+ .setOOSServerPattern("search/cluster.*/rtx/*/clustercontroller")
+ .setListenPort(mbusConfig.port());
+ return SharedMessageBus.newInstance(mbusParams, netParams);
+ }
+
+ private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) {
+ log.log(LogLevel.DEBUG,
+ "Running with maximum heap size of " + (Runtime.getRuntime().maxMemory() / 1024L / 1024L) + " MB");
+ log.log(LogLevel.CONFIG,
+ "Amount of memory reserved for container core: " + containerMbusConfig.containerCoreMemory() + " MB.");
+ log.log(LogLevel.CONFIG,
+ "Running with document expansion factor " + containerMbusConfig.documentExpansionFactor() + "");
+
+ String msgLimit =
+ (containerMbusConfig.maxpendingcount() == 0) ? "unlimited" : "" + containerMbusConfig.maxpendingcount();
+ log.log(LogLevel.CONFIG, ("Starting message bus with max " + msgLimit + " pending messages and max " +
+ (((double) (maxPendingSize / 1024L)) / 1024.0d) + " pending megabytes."));
+ }
+
+ public ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
+ return intermediatesCreator.retain(intermediateLock, intermediates, p);
+ }
+
+ public ReferencedResource<SharedDestinationSession> retainDestination(final DestinationSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
+ return destinationsCreator.retain(destinationLock, destinations, p);
+ }
+
+ public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
+ return sourcesCreator.retain(sourceLock, sources, p);
+ }
+
+ private abstract class SessionCreator<PARAMS, KEY, SESSION extends SharedResource> {
+ abstract SESSION create(PARAMS p);
+
+ abstract KEY buildKey(PARAMS p);
+
+ abstract void logReuse(SESSION session);
+
+ ReferencedResource<SESSION> retain(final Object lock, final Map<KEY, SESSION> registry, final PARAMS p) {
+ SESSION session;
+ ResourceReference sessionReference;
+ final KEY key = buildKey(p);
+ // this lock is held for a horribly long time, but I see no way of
+ // making it slimmer
+ synchronized (lock) {
+ session = registry.get(key);
+ if (session == null) {
+ session = createAndStore(registry, p, key);
+ sessionReference = References.fromResource(session);
+ } else {
+ try {
+ sessionReference = session.refer();
+ logReuse(session);
+ } catch (final IllegalStateException e) {
+ session = createAndStore(registry, p, key);
+ sessionReference = References.fromResource(session);
+ }
+ }
+ }
+ return new ReferencedResource<>(session, sessionReference);
+ }
+
+ SESSION createAndStore(final Map<KEY, SESSION> registry, final PARAMS p, final KEY key) {
+ SESSION session = create(p);
+ registry.put(key, session);
+ return session;
+ }
+
+ }
+
+ private class DestinationSessionCreator
+ extends SessionCreator<DestinationSessionParams, String, SharedDestinationSession> {
+ @Override
+ SharedDestinationSession create(final DestinationSessionParams p) {
+ log.log(LogLevel.DEBUG, "Creating new destination session " + p.getName() + "");
+ return messageBus.newDestinationSession(p);
+ }
+
+ @Override
+ String buildKey(final DestinationSessionParams p) {
+ return p.getName();
+ }
+
+ @Override
+ void logReuse(final SharedDestinationSession session) {
+ log.log(LogLevel.DEBUG, "Reusing destination session " + session.name() + "");
+ }
+ }
+
+ private class SourceSessionCreator
+ extends SessionCreator<SourceSessionParams, SourceSessionKey, SharedSourceSession> {
+
+ @Override
+ SharedSourceSession create(final SourceSessionParams p) {
+ log.log(LogLevel.DEBUG, "Creating new source session.");
+ return messageBus.newSourceSession(p);
+ }
+
+ @Override
+ SourceSessionKey buildKey(final SourceSessionParams p) {
+ return new SourceSessionKey(p);
+ }
+
+ @Override
+ void logReuse(final SharedSourceSession session) {
+ log.log(LogLevel.DEBUG, "Reusing source session.");
+ }
+ }
+
+ private class IntermediateSessionCreator
+ extends SessionCreator<IntermediateSessionParams, String, SharedIntermediateSession> {
+
+ @Override
+ SharedIntermediateSession create(final IntermediateSessionParams p) {
+ log.log(LogLevel.DEBUG, "Creating new intermediate session " + p.getName() + "");
+ return messageBus.newIntermediateSession(p);
+ }
+
+ @Override
+ String buildKey(final IntermediateSessionParams p) {
+ return p.getName();
+ }
+
+ @Override
+ void logReuse(final SharedIntermediateSession session) {
+ log.log(LogLevel.DEBUG, "Reusing intermediate session " + session.name() + "");
+ }
+ }
+
+ static class ThrottlePolicySignature {
+ @Override
+ public int hashCode() {
+ return getClass().hashCode();
+ }
+ }
+
+ static class StaticThrottlePolicySignature extends
+ ThrottlePolicySignature {
+ private final int maxPendingCount;
+ private final long maxPendingSize;
+
+ StaticThrottlePolicySignature(final StaticThrottlePolicy policy) {
+ maxPendingCount = policy.getMaxPendingCount();
+ maxPendingSize = policy.getMaxPendingSize();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + maxPendingCount;
+ result = prime * result
+ + (int) (maxPendingSize ^ (maxPendingSize >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final StaticThrottlePolicySignature other = (StaticThrottlePolicySignature) obj;
+ if (maxPendingCount != other.maxPendingCount) {
+ return false;
+ }
+ if (maxPendingSize != other.maxPendingSize) {
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ static class DynamicThrottlePolicySignature extends
+ ThrottlePolicySignature {
+ private final int maxPending;
+ private final double maxWindowSize;
+ private final double minWindowSize;
+ private final double windowSizeBackoff;
+ private final double windowSizeIncrement;
+
+ DynamicThrottlePolicySignature(final DynamicThrottlePolicy policy) {
+ maxPending = policy.getMaxPendingCount();
+ maxWindowSize = policy.getMaxWindowSize();
+ minWindowSize = policy.getMinWindowSize();
+ windowSizeBackoff = policy.getWindowSizeBackOff();
+ windowSizeIncrement = policy.getWindowSizeIncrement();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + maxPending;
+ long temp;
+ temp = Double.doubleToLongBits(maxWindowSize);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(minWindowSize);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(windowSizeBackoff);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(windowSizeIncrement);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final DynamicThrottlePolicySignature other = (DynamicThrottlePolicySignature) obj;
+ if (maxPending != other.maxPending) {
+ return false;
+ }
+ if (Double.doubleToLongBits(maxWindowSize) != Double
+ .doubleToLongBits(other.maxWindowSize)) {
+ return false;
+ }
+ if (Double.doubleToLongBits(minWindowSize) != Double
+ .doubleToLongBits(other.minWindowSize)) {
+ return false;
+ }
+ if (Double.doubleToLongBits(windowSizeBackoff) != Double
+ .doubleToLongBits(other.windowSizeBackoff)) {
+ return false;
+ }
+ if (Double.doubleToLongBits(windowSizeIncrement) != Double
+ .doubleToLongBits(other.windowSizeIncrement)) {
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ static class UnknownThrottlePolicySignature extends
+ ThrottlePolicySignature {
+ private final ThrottlePolicy policy;
+
+ UnknownThrottlePolicySignature(final ThrottlePolicy policy) {
+ this.policy = policy;
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass() != getClass()) {
+ return false;
+ }
+ return ((UnknownThrottlePolicySignature) other).policy == policy;
+ }
+ }
+
+ static class SourceSessionKey {
+ private final double timeout;
+ private final ThrottlePolicySignature policy;
+
+ SourceSessionKey(final SourceSessionParams p) {
+ timeout = p.getTimeout();
+ policy = createSignature(p.getThrottlePolicy());
+ }
+
+ private static ThrottlePolicySignature createSignature(
+ final ThrottlePolicy policy) {
+ final Class<?> policyClass = policy.getClass();
+ if (policyClass == DynamicThrottlePolicy.class) {
+ return new DynamicThrottlePolicySignature(
+ (DynamicThrottlePolicy) policy);
+ } else if (policyClass == StaticThrottlePolicy.class) {
+ return new StaticThrottlePolicySignature(
+ (StaticThrottlePolicy) policy);
+ } else {
+ return new UnknownThrottlePolicySignature(policy);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SourceSessionKey{" +
+ "timeout=" + timeout +
+ ", policy=" + policy +
+ '}';
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((policy == null) ? 0 : policy.hashCode());
+ long temp;
+ temp = Double.doubleToLongBits(timeout);
+ result = prime * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final SourceSessionKey other = (SourceSessionKey) obj;
+ if (policy == null) {
+ if (other.policy != null) {
+ return false;
+ }
+ } else if (!policy.equals(other.policy)) {
+ return false;
+ }
+ if (Double.doubleToLongBits(timeout) != Double
+ .doubleToLongBits(other.timeout)) {
+ return false;
+ }
+ return true;
+ }
+ }
+}