summaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java114
1 files changed, 64 insertions, 50 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index ab3d09af178..d65b2f7cc12 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -1,24 +1,22 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
-import com.google.inject.Inject;
-import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.component.AbstractComponent;
+import com.yahoo.config.subscription.ConfigGetter;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentUtil;
-import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.References;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
+import java.util.logging.Level;
+import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
@@ -28,12 +26,9 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.vespa.config.content.DistributionConfig;
-import com.yahoo.vespa.config.content.LoadTypeConfig;
import java.util.HashMap;
import java.util.Map;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -42,19 +37,24 @@ import java.util.logging.Logger;
* @author Steinar Knutsen
* @author Einar Rosenvinge
*/
-// TODO jonmv: Remove this: only used with more than one entry by FeedHandlerV3, where only timeout varies.
-// rant: This whole construct is because DI at one point didn't exist, so getting hold of a shared resource
-// or session was hard(?), and one resorted to routing through the Container, using URIs, to the correct
-// MbusClient, with or without throttling. This introduced the problem of ownership during shutdown,
-// which was solved with manual reference counting. This is all much better solved with DI, which (now)
-// owns everything, and does component shutdown in reverse construction order, which is always right.
-// So for the sake everyone's mental health, this should all just be removed now! I suspect this is
-// even the case for Request; we can track in handlers, and warn when requests have been misplaced.
+// TODO jonmv: Remove this? Only used sensibly by FeedHandlerV3, where only timeout varies.
public final class SessionCache extends AbstractComponent {
private static final Logger log = Logger.getLogger(SessionCache.class.getName());
- private final SharedMessageBus messageBus;
+ //config
+ private final String messagebusConfigId;
+ private final String slobrokConfigId;
+ private final String identity;
+ private final String containerMbusConfigId;
+ private final String documentManagerConfigId;
+ private final String loadTypeConfigId;
+ private final DocumentTypeManager documentTypeManager;
+
+ // initialized in start()
+ private ConfigAgent configAgent;
+ private SharedMessageBus messageBus;
+
private final Object intermediateLock = new Object();
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
@@ -63,48 +63,52 @@ public final class SessionCache extends AbstractComponent {
private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
- @Inject
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
- DistributionConfig distributionConfig) {
- this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig,
- messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //:
+ public SessionCache(String messagebusConfigId, String slobrokConfigId, String identity,
+ String containerMbusConfigId, String documentManagerConfigId,
+ String loadTypeConfigId,
+ DocumentTypeManager documentTypeManager) {
+ this.messagebusConfigId = messagebusConfigId;
+ this.slobrokConfigId = slobrokConfigId;
+ this.identity = identity;
+ this.containerMbusConfigId = containerMbusConfigId;
+ this.documentManagerConfigId = documentManagerConfigId;
+ this.loadTypeConfigId = loadTypeConfigId;
+ this.documentTypeManager = documentTypeManager;
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
- DistributionConfig distributionConfig, String identity) {
- this.messageBus = createSharedMessageBus(containerMbusConfig,
- messagebusConfig,
- slobroksConfig,
- identity,
- new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
- new LoadTypeSet(loadTypeConfig),
- policiesConfig,
- distributionConfig));
+ public SessionCache(final String identity) {
+ this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, String identity, Protocol protocol) {
- this.messageBus = createSharedMessageBus(containerMbusConfig,
- messagebusConfig,
- slobroksConfig,
- identity,
- protocol);
+ public void deconstruct() {
+ if (configAgent != null) {
+ configAgent.shutdown();
+ }
}
- public void deconstruct() {
- messageBus.release();
+
+ private void start() {
+ ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
+ if (documentManagerConfigId != null) {
+ documentTypeManager.configure(documentManagerConfigId);
+ }
+ LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId);
+ DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet);
+ messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol);
+ // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well
+ configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus());
+ configAgent.subscribe();
+ }
+
+
+ private boolean isStarted() {
+ return messageBus != null;
}
private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
- MessagebusConfig messagebusConfig,
- SlobroksConfig slobroksConfig, String identity,
+ String slobrokConfigId, String identity,
Protocol protocol) {
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol)
- .setMessageBusConfig(messagebusConfig);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
int maxPendingSize = DocumentUtil
.calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(),
@@ -115,7 +119,7 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingSize(maxPendingSize);
RPCNetworkParams netParams = new RPCNetworkParams()
- .setSlobroksConfig(slobroksConfig)
+ .setSlobrokConfigId(slobrokConfigId)
.setIdentity(new Identity(identity))
.setListenPort(mbusConfig.port())
.setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
@@ -140,10 +144,20 @@ public final class SessionCache extends AbstractComponent {
}
ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
return intermediatesCreator.retain(intermediateLock, intermediates, p);
}
public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
+ synchronized (this) {
+ if (!isStarted()) {
+ start();
+ }
+ }
return sourcesCreator.retain(sourceLock, sources, p);
}