aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java114
1 files changed, 50 insertions, 64 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index d65b2f7cc12..ab3d09af178 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -1,22 +1,24 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.container.jdisc.messagebus;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.SlobroksConfig;
import com.yahoo.component.AbstractComponent;
-import com.yahoo.config.subscription.ConfigGetter;
import com.yahoo.container.jdisc.ContainerMbusConfig;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentUtil;
+import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.messagebus.loadtypes.LoadTypeSet;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocolPoliciesConfig;
import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.References;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
-import java.util.logging.Level;
-import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
@@ -26,9 +28,12 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.vespa.config.content.DistributionConfig;
+import com.yahoo.vespa.config.content.LoadTypeConfig;
import java.util.HashMap;
import java.util.Map;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -37,24 +42,19 @@ import java.util.logging.Logger;
* @author Steinar Knutsen
* @author Einar Rosenvinge
*/
-// TODO jonmv: Remove this? Only used sensibly by FeedHandlerV3, where only timeout varies.
+// TODO jonmv: Remove this: only used with more than one entry by FeedHandlerV3, where only timeout varies.
+// rant: This whole construct is because DI at one point didn't exist, so getting hold of a shared resource
+// or session was hard(?), and one resorted to routing through the Container, using URIs, to the correct
+// MbusClient, with or without throttling. This introduced the problem of ownership during shutdown,
+// which was solved with manual reference counting. This is all much better solved with DI, which (now)
+// owns everything, and does component shutdown in reverse construction order, which is always right.
+// So for the sake everyone's mental health, this should all just be removed now! I suspect this is
+// even the case for Request; we can track in handlers, and warn when requests have been misplaced.
public final class SessionCache extends AbstractComponent {
private static final Logger log = Logger.getLogger(SessionCache.class.getName());
- //config
- private final String messagebusConfigId;
- private final String slobrokConfigId;
- private final String identity;
- private final String containerMbusConfigId;
- private final String documentManagerConfigId;
- private final String loadTypeConfigId;
- private final DocumentTypeManager documentTypeManager;
-
- // initialized in start()
- private ConfigAgent configAgent;
- private SharedMessageBus messageBus;
-
+ private final SharedMessageBus messageBus;
private final Object intermediateLock = new Object();
private final Map<String, SharedIntermediateSession> intermediates = new HashMap<>();
private final IntermediateSessionCreator intermediatesCreator = new IntermediateSessionCreator();
@@ -63,52 +63,48 @@ public final class SessionCache extends AbstractComponent {
private final Map<SourceSessionKey, SharedSourceSession> sources = new HashMap<>();
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
- public SessionCache(String messagebusConfigId, String slobrokConfigId, String identity,
- String containerMbusConfigId, String documentManagerConfigId,
- String loadTypeConfigId,
- DocumentTypeManager documentTypeManager) {
- this.messagebusConfigId = messagebusConfigId;
- this.slobrokConfigId = slobrokConfigId;
- this.identity = identity;
- this.containerMbusConfigId = containerMbusConfigId;
- this.documentManagerConfigId = documentManagerConfigId;
- this.loadTypeConfigId = loadTypeConfigId;
- this.documentTypeManager = documentTypeManager;
+ @Inject
+ public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
+ MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
+ this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig,
+ messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //:
}
- public SessionCache(final String identity) {
- this(identity, identity, identity, identity, identity, identity, new DocumentTypeManager());
- }
-
- public void deconstruct() {
- if (configAgent != null) {
- configAgent.shutdown();
- }
+ public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
+ MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig, String identity) {
+ this.messageBus = createSharedMessageBus(containerMbusConfig,
+ messagebusConfig,
+ slobroksConfig,
+ identity,
+ new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
+ new LoadTypeSet(loadTypeConfig),
+ policiesConfig,
+ distributionConfig));
}
-
- private void start() {
- ContainerMbusConfig mbusConfig = ConfigGetter.getConfig(ContainerMbusConfig.class, containerMbusConfigId);
- if (documentManagerConfigId != null) {
- documentTypeManager.configure(documentManagerConfigId);
- }
- LoadTypeSet loadTypeSet = new LoadTypeSet(loadTypeConfigId);
- DocumentProtocol protocol = new DocumentProtocol(documentTypeManager, identity, loadTypeSet);
- messageBus = createSharedMessageBus(mbusConfig, slobrokConfigId, identity, protocol);
- // TODO: stop doing subscriptions to config when that is to be solved in slobrok as well
- configAgent = new ConfigAgent(messagebusConfigId, messageBus.messageBus());
- configAgent.subscribe();
+ public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig,
+ MessagebusConfig messagebusConfig, String identity, Protocol protocol) {
+ this.messageBus = createSharedMessageBus(containerMbusConfig,
+ messagebusConfig,
+ slobroksConfig,
+ identity,
+ protocol);
}
-
- private boolean isStarted() {
- return messageBus != null;
+ public void deconstruct() {
+ messageBus.release();
}
private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
- String slobrokConfigId, String identity,
+ MessagebusConfig messagebusConfig,
+ SlobroksConfig slobroksConfig, String identity,
Protocol protocol) {
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol)
+ .setMessageBusConfig(messagebusConfig);
int maxPendingSize = DocumentUtil
.calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(),
@@ -119,7 +115,7 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingSize(maxPendingSize);
RPCNetworkParams netParams = new RPCNetworkParams()
- .setSlobrokConfigId(slobrokConfigId)
+ .setSlobroksConfig(slobroksConfig)
.setIdentity(new Identity(identity))
.setListenPort(mbusConfig.port())
.setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
@@ -144,20 +140,10 @@ public final class SessionCache extends AbstractComponent {
}
ReferencedResource<SharedIntermediateSession> retainIntermediate(final IntermediateSessionParams p) {
- synchronized (this) {
- if (!isStarted()) {
- start();
- }
- }
return intermediatesCreator.retain(intermediateLock, intermediates, p);
}
public ReferencedResource<SharedSourceSession> retainSource(final SourceSessionParams p) {
- synchronized (this) {
- if (!isStarted()) {
- start();
- }
- }
return sourcesCreator.retain(sourceLock, sources, p);
}