aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-08-11 12:00:16 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-16 21:01:03 +0200
commitf6510a771d4a8b00d3027131f6071e95a56b04bd (patch)
tree5cd5eefaedcecb94a91b1f612a2b0d81a7eaa67f /container-messagebus
parent22999cb793c055ea094a7c4d6cbdd92441819b38 (diff)
Ensure shared message bus has its routing policy set up on construction
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java24
1 files changed, 13 insertions, 11 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index ab3d09af178..cfb3223a4f0 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -15,6 +15,7 @@ import com.yahoo.jdisc.ReferencedResource;
import com.yahoo.jdisc.References;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.SharedResource;
+import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.MessageBusParams;
@@ -76,14 +77,14 @@ public final class SessionCache extends AbstractComponent {
LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
DistributionConfig distributionConfig, String identity) {
- this.messageBus = createSharedMessageBus(containerMbusConfig,
- messagebusConfig,
- slobroksConfig,
- identity,
- new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
- new LoadTypeSet(loadTypeConfig),
- policiesConfig,
- distributionConfig));
+ this(containerMbusConfig,
+ slobroksConfig,
+ messagebusConfig,
+ identity,
+ new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
+ new LoadTypeSet(loadTypeConfig),
+ policiesConfig,
+ distributionConfig));
}
public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig,
@@ -103,8 +104,7 @@ public final class SessionCache extends AbstractComponent {
MessagebusConfig messagebusConfig,
SlobroksConfig slobroksConfig, String identity,
Protocol protocol) {
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol)
- .setMessageBusConfig(messagebusConfig);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
int maxPendingSize = DocumentUtil
.calculateMaxPendingSize(mbusConfig.maxConcurrentFactor(), mbusConfig.documentExpansionFactor(),
@@ -122,7 +122,9 @@ public final class SessionCache extends AbstractComponent {
.setNumNetworkThreads(mbusConfig.numthreads())
.setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
.setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
- return SharedMessageBus.newInstance(mbusParams, netParams);
+ SharedMessageBus bus = SharedMessageBus.newInstance(mbusParams, netParams);
+ new ConfigAgent(messagebusConfig, bus.messageBus()); // Configure the wrapped MessageBus with a routing table.
+ return bus;
}
private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) {