diff options
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java')
-rw-r--r-- | container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java | 114 |
1 files changed, 50 insertions, 64 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index d65b2f7cc12..ab3d09af178 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -1,22 +1,24 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.jdisc.messagebus; +import com.google.inject.Inject; +import com.yahoo.cloud.config.SlobroksConfig; import com.yahoo.component.AbstractComponent; -import com.yahoo.config.subscription.ConfigGetter; import com.yahoo.container.jdisc.ContainerMbusConfig; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentUtil; +import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig; import com.yahoo.jdisc.ReferencedResource; import com.yahoo.jdisc.References; import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.SharedResource; -import java.util.logging.Level; -import com.yahoo.messagebus.ConfigAgent; import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.IntermediateSessionParams; import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.MessagebusConfig; import com.yahoo.messagebus.Protocol; import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.StaticThrottlePolicy; @@ -26,9 +28,12 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import com.yahoo.messagebus.shared.SharedIntermediateSession; import com.yahoo.messagebus.shared.SharedMessageBus; import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.vespa.config.content.DistributionConfig; +import com.yahoo.vespa.config.content.LoadTypeConfig; import java.util.HashMap; import java.util.Map; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -37,24 +42,19 @@ import java.util.logging.Logger; * @author Steinar Knutsen * @author Einar Rosenvinge */ -// TODO jonmv: Remove this? Only used sensibly by FeedHandlerV3, where only timeout varies. +// TODO jonmv: Remove this: only used with more than one entry by FeedHandlerV3, where only timeout varies. +// rant: This whole construct is because DI at one point didn't exist, so getting hold of a shared resource +// or session was hard(?), and one resorted to routing through the Container, using URIs, to the correct +// MbusClient, with or without throttling. This introduced the problem of ownership during shutdown, +// which was solved with manual reference counting. This is all much better solved with DI, which (now) +// owns everything, and does component shutdown in reverse construction order, which is always right. +// So for the sake everyone's mental health, this should all just be removed now! I suspect this is +// even the case for Request; we can track in handlers, and warn when requests have been misplaced. public final class SessionCache extends AbstractComponent { private static final Logger log = Logger.getLogger(SessionCache.class.getName()); - //config - private final String messagebusConfigId; - private final String slobrokConfigId; - private final String identity; - private final String containerMbusConfigId; - private final String documentManagerConfigId; - private final String loadTypeConfigId; - private final DocumentTypeManager documentTypeManager; - - // initialized in start() - private ConfigAgent configAgent; - private SharedMessageBus messageBus; - + private final SharedMessageBus messageBus; private final Object intermediateLock = new Object(); private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>(); private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator(); @@ -63,52 +63,48 @@ public final class SessionCache extends AbstractComponent { private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>(); private final SourceSessionCreator sourcesCreator = new SourceSessionCreator(); - public SessionCache(String messagebusConfigId, String slobrokConfigId, String identity, - String containerMbusConfigId, String documentManagerConfigId, - String loadTypeConfigId, - DocumentTypeManager documentTypeManager) { - this.messagebusConfigId = messagebusConfigId; - this.slobrokConfigId = slobrokConfigId; - this.identity = identity; - this.containerMbusConfigId = containerMbusConfigId; - this.documentManagerConfigId = documentManagerConfigId; - this.loadTypeConfigId = loadTypeConfigId; - this.documentTypeManager = documentTypeManager; + @Inject + public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, + LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig, + MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, + DistributionConfig distributionConfig) { + this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig, + messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //: } - public SessionCache(final String identity) { - this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager()); - } - - public void deconstruct() { - if (configAgent != null) { - configAgent.shutdown(); - } + public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, + LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig, + MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, + DistributionConfig distributionConfig, String identity) { + this.messageBus = createSharedMessageBus(containerMbusConfig, + messagebusConfig, + slobroksConfig, + identity, + new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig), + new LoadTypeSet(loadTypeConfig), + policiesConfig, + distributionConfig)); } - - private void start() { - ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId); - if (documentManagerConfigId != null) { - documentTypeManager.configure(documentManagerConfigId); - } - LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId); - DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet); - messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol); - // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well - configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus()); - configAgent.subscribe(); + public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig, + MessagebusConfig messagebusConfig, String identity, Protocol protocol) { + this.messageBus = createSharedMessageBus(containerMbusConfig, + messagebusConfig, + slobroksConfig, + identity, + protocol); } - - private boolean isStarted() { - return messageBus != null; + public void deconstruct() { + messageBus.release(); } private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig, - String slobrokConfigId, String identity, + MessagebusConfig messagebusConfig, + SlobroksConfig slobroksConfig, String identity, Protocol protocol) { - MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); + MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol) + .setMessageBusConfig(messagebusConfig); int maxPendingSize = DocumentUtil .calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(), @@ -119,7 +115,7 @@ public final class SessionCache extends AbstractComponent { mbusParams.setMaxPendingSize(maxPendingSize); RPCNetworkParams netParams = new RPCNetworkParams() - .setSlobrokConfigId(slobrokConfigId) + .setSlobroksConfig(slobroksConfig) .setIdentity(new Identity(identity)) .setListenPort(mbusConfig.port()) .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget()) @@ -144,20 +140,10 @@ public final class SessionCache extends AbstractComponent { } ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) { - synchronized (this) { - if (!isStarted()) { - start(); - } - } return intermediatesCreator.retain(intermediateLock, intermediates, p); } public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) { - synchronized (this) { - if (!isStarted()) { - start(); - } - } return sourcesCreator.retain(sourceLock, sources, p); } |