diff options
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java')
-rw-r--r-- | container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java | 58 |
1 files changed, 24 insertions, 34 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java index 850cfc0b4bf..3e3902b35aa 100644 --- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java +++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java @@ -18,14 +18,14 @@ import com.yahoo.jdisc.SharedResource; import com.yahoo.messagebus.ConfigAgent; import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.MessageBus; import com.yahoo.messagebus.MessageBusParams; import com.yahoo.messagebus.MessagebusConfig; import com.yahoo.messagebus.Protocol; import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; -import com.yahoo.messagebus.network.Identity; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.NetworkMultiplexer; import com.yahoo.messagebus.shared.SharedIntermediateSession; import com.yahoo.messagebus.shared.SharedMessageBus; import com.yahoo.messagebus.shared.SharedSourceSession; @@ -65,44 +65,42 @@ public final class SessionCache extends AbstractComponent { private final SourceSessionCreator sourcesCreator = new SourceSessionCreator(); @Inject - public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, - LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, + public SessionCache(NetworkMultiplexerProvider nets, ContainerMbusConfig containerMbusConfig, + DocumentmanagerConfig documentmanagerConfig, + LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig, + DocumentProtocolPoliciesConfig policiesConfig, DistributionConfig distributionConfig) { - this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig, - messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //: + this(nets.net(), containerMbusConfig, documentmanagerConfig, + loadTypeConfig, messagebusConfig, policiesConfig, distributionConfig); + } - public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig, - LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig, - DistributionConfig distributionConfig, String identity) { - this(containerMbusConfig, - slobroksConfig, + public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig, + DocumentmanagerConfig documentmanagerConfig, + LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig, + DocumentProtocolPoliciesConfig policiesConfig, + DistributionConfig distributionConfig) { + this(net, + containerMbusConfig, messagebusConfig, - identity, new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig), new LoadTypeSet(loadTypeConfig), policiesConfig, distributionConfig)); } - public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig, - MessagebusConfig messagebusConfig, String identity, Protocol protocol) { - this.messageBus = createSharedMessageBus(containerMbusConfig, - messagebusConfig, - slobroksConfig, - identity, - protocol); + public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig, + MessagebusConfig messagebusConfig, Protocol protocol) { + this.messageBus = createSharedMessageBus(net, containerMbusConfig, messagebusConfig, protocol); } public void deconstruct() { messageBus.release(); } - private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig, + private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net, + ContainerMbusConfig mbusConfig, MessagebusConfig messagebusConfig, - SlobroksConfig slobroksConfig, String identity, Protocol protocol) { MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); @@ -114,17 +112,9 @@ public final class SessionCache extends AbstractComponent { mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount()); mbusParams.setMaxPendingSize(maxPendingSize); - RPCNetworkParams netParams = new RPCNetworkParams() - .setSlobroksConfig(slobroksConfig) - .setIdentity(new Identity(identity)) - .setListenPort(mbusConfig.port()) - .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget()) - .setNumNetworkThreads(mbusConfig.numthreads()) - .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup()) - .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name())); - SharedMessageBus bus = SharedMessageBus.newInstance(mbusParams, netParams); - new ConfigAgent(messagebusConfig, bus.messageBus()); // Configure the wrapped MessageBus with a routing table. - return bus; + MessageBus bus = new MessageBus(net, mbusParams); + new ConfigAgent(messagebusConfig, bus); // Configure the wrapped MessageBus with a routing table. + return new SharedMessageBus(bus); } private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) { |