diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2021-07-02 13:03:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-02 13:03:42 +0200 |
commit | df5b3c4a3717ecb6fe490aa354baeadab060e720 (patch) | |
tree | 157f6bc0dca7eaf515672e826944b0af2b4ce055 /container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java | |
parent | 7537c42fa661952916cf91eebbb75e6984505100 (diff) |
Revert "Jonmv/reapply session cache with injected config"
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java')
-rw-r--r-- | container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java | 114 |
1 files changed, 64 insertions, 50 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index ab3d09af178..d65b2f7cc12 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -1,24 +1,22 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.jdisc.messagebus; -import com.google.inject.Inject; -import com.yahoo.cloud.config.SlobroksConfig; import com.yahoo.component.AbstractComponent; +import com.yahoo.config.subscription.ConfigGetter; import com.yahoo.container.jdisc.ContainerMbusConfig; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentUtil; -import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig; import com.yahoo.jdisc.ReferencedResource; import com.yahoo.jdisc.References; import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.SharedResource; +import java.util.logging.Level; +import com.yahoo.messagebus.ConfigAgent; import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.IntermediateSessionParams; import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.MessagebusConfig; import com.yahoo.messagebus.Protocol; import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.StaticThrottlePolicy; @@ -28,12 +26,9 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import com.yahoo.messagebus.shared.SharedIntermediateSession; import com.yahoo.messagebus.shared.SharedMessageBus; import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.vespa.config.content.DistributionConfig; -import com.yahoo.vespa.config.content.LoadTypeConfig; import java.util.HashMap; import java.util.Map; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -42,19 +37,24 @@ import java.util.logging.Logger; * @author Steinar Knutsen * @author Einar Rosenvinge */ -// TODO jonmv: Remove this: only used with more than one entry by FeedHandlerV3, where only timeout varies. -// rant: This whole construct is because DI at one point didn't exist, so getting hold of a shared resource -// or session was hard(?), and one resorted to routing through the Container, using URIs, to the correct -// MbusClient, with or without throttling. This introduced the problem of ownership during shutdown, -// which was solved with manual reference counting. This is all much better solved with DI, which (now) -// owns everything, and does component shutdown in reverse construction order, which is always right. -// So for the sake everyone's mental health, this should all just be removed now! I suspect this is -// even the case for Request; we can track in handlers, and warn when requests have been misplaced. +// TODO jonmv: Remove this? Only used sensibly by FeedHandlerV3, where only timeout varies. public final class SessionCache extends AbstractComponent { private static final Logger log = Logger.getLogger(SessionCache.class.getName()); - private final SharedMessageBus messageBus; + //config + private final String messagebusConfigId; + private final String slobrokConfigId; + private final String identity; + private final String containerMbusConfigId; + private final String documentManagerConfigId; + private final String loadTypeConfigId; + private final DocumentTypeManager documentTypeManager; + + // initialized in start() + private ConfigAgent configAgent; + private SharedMessageBus messageBus; + private final Object intermediateLock = new Object(); private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>(); private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator(); @@ -63,48 +63,52 @@ public final class SessionCache extends AbstractComponent { private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>(); private final SourceSessionCreator sourcesCreator = new SourceSessionCreator(); - @Inject - public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, - LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, - DistributionConfig distributionConfig) { - this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig, - messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //: + public SessionCache(String messagebusConfigId, String slobrokConfigId, String identity, + String containerMbusConfigId, String documentManagerConfigId, + String loadTypeConfigId, + DocumentTypeManager documentTypeManager) { + this.messagebusConfigId = messagebusConfigId; + this.slobrokConfigId = slobrokConfigId; + this.identity = identity; + this.containerMbusConfigId = containerMbusConfigId; + this.documentManagerConfigId = documentManagerConfigId; + this.loadTypeConfigId = loadTypeConfigId; + this.documentTypeManager = documentTypeManager; } - public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, - LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, - DistributionConfig distributionConfig, String identity) { - this.messageBus = createSharedMessageBus(containerMbusConfig, - messagebusConfig, - slobroksConfig, - identity, - new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig), - new LoadTypeSet(loadTypeConfig), - policiesConfig, - distributionConfig)); + public SessionCache(final String identity) { + this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager()); } - public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, String identity, Protocol protocol) { - this.messageBus = createSharedMessageBus(containerMbusConfig, - messagebusConfig, - slobroksConfig, - identity, - protocol); + public void deconstruct() { + if (configAgent != null) { + configAgent.shutdown(); + } } - public void deconstruct() { - messageBus.release(); + + private void start() { + ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId); + if (documentManagerConfigId != null) { + documentTypeManager.configure(documentManagerConfigId); + } + LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId); + DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet); + messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol); + // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well + configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus()); + configAgent.subscribe(); + } + + + private boolean isStarted() { + return messageBus != null; } private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig, - MessagebusConfig messagebusConfig, - SlobroksConfig slobroksConfig, String identity, + String slobrokConfigId, String identity, Protocol protocol) { - MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol) - .setMessageBusConfig(messagebusConfig); + MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); int maxPendingSize = DocumentUtil .calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(), @@ -115,7 +119,7 @@ public final class SessionCache extends AbstractComponent { mbusParams.setMaxPendingSize(maxPendingSize); RPCNetworkParams netParams = new RPCNetworkParams() - .setSlobroksConfig(slobroksConfig) + .setSlobrokConfigId(slobrokConfigId) .setIdentity(new Identity(identity)) .setListenPort(mbusConfig.port()) .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget()) @@ -140,10 +144,20 @@ public final class SessionCache extends AbstractComponent { } ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) { + synchronized (this) { + if (!isStarted()) { + start(); + } + } return intermediatesCreator.retain(intermediateLock, intermediates, p); } public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) { + synchronized (this) { + if (!isStarted()) { + start(); + } + } return sourcesCreator.retain(sourceLock, sources, p); } |