summaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java58
1 files changed, 24 insertions, 34 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
index 850cfc0b4bf..3e3902b35aa 100644
--- a/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
+++ b/container-messagebus/src/main/java/com/yahoo/container/jdisc/messagebus/SessionCache.java
@@ -18,14 +18,14 @@ import com.yahoo.jdisc.SharedResource;
import com.yahoo.messagebus.ConfigAgent;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.MessagebusConfig;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.shared.SharedIntermediateSession;
import com.yahoo.messagebus.shared.SharedMessageBus;
import com.yahoo.messagebus.shared.SharedSourceSession;
@@ -65,44 +65,42 @@ public final class SessionCache extends AbstractComponent {
private final SourceSessionCreator sourcesCreator = new SourceSessionCreator();
@Inject
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
+ public SessionCache(NetworkMultiplexerProvider nets, ContainerMbusConfig containerMbusConfig,
+ DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
DistributionConfig distributionConfig) {
- this(containerMbusConfig, documentmanagerConfig, loadTypeConfig, slobroksConfig,
- messagebusConfig, policiesConfig, distributionConfig, System.getProperty("config.id")); //:
+ this(nets.net(), containerMbusConfig, documentmanagerConfig,
+ loadTypeConfig, messagebusConfig, policiesConfig, distributionConfig);
+
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, DocumentmanagerConfig documentmanagerConfig,
- LoadTypeConfig loadTypeConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, DocumentProtocolPoliciesConfig policiesConfig,
- DistributionConfig distributionConfig, String identity) {
- this(containerMbusConfig,
- slobroksConfig,
+ public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ DocumentmanagerConfig documentmanagerConfig,
+ LoadTypeConfig loadTypeConfig, MessagebusConfig messagebusConfig,
+ DocumentProtocolPoliciesConfig policiesConfig,
+ DistributionConfig distributionConfig) {
+ this(net,
+ containerMbusConfig,
messagebusConfig,
- identity,
new DocumentProtocol(new DocumentTypeManager(documentmanagerConfig),
new LoadTypeSet(loadTypeConfig),
policiesConfig,
distributionConfig));
}
- public SessionCache(ContainerMbusConfig containerMbusConfig, SlobroksConfig slobroksConfig,
- MessagebusConfig messagebusConfig, String identity, Protocol protocol) {
- this.messageBus = createSharedMessageBus(containerMbusConfig,
- messagebusConfig,
- slobroksConfig,
- identity,
- protocol);
+ public SessionCache(NetworkMultiplexer net, ContainerMbusConfig containerMbusConfig,
+ MessagebusConfig messagebusConfig, Protocol protocol) {
+ this.messageBus = createSharedMessageBus(net, containerMbusConfig, messagebusConfig, protocol);
}
public void deconstruct() {
messageBus.release();
}
- private static SharedMessageBus createSharedMessageBus(ContainerMbusConfig mbusConfig,
+ private static SharedMessageBus createSharedMessageBus(NetworkMultiplexer net,
+ ContainerMbusConfig mbusConfig,
MessagebusConfig messagebusConfig,
- SlobroksConfig slobroksConfig, String identity,
Protocol protocol) {
MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
@@ -114,17 +112,9 @@ public final class SessionCache extends AbstractComponent {
mbusParams.setMaxPendingCount(mbusConfig.maxpendingcount());
mbusParams.setMaxPendingSize(maxPendingSize);
- RPCNetworkParams netParams = new RPCNetworkParams()
- .setSlobroksConfig(slobroksConfig)
- .setIdentity(new Identity(identity))
- .setListenPort(mbusConfig.port())
- .setNumTargetsPerSpec(mbusConfig.numconnectionspertarget())
- .setNumNetworkThreads(mbusConfig.numthreads())
- .setTransportEventsBeforeWakeup(mbusConfig.transport_events_before_wakeup())
- .setOptimization(RPCNetworkParams.Optimization.valueOf(mbusConfig.optimize_for().name()));
- SharedMessageBus bus = SharedMessageBus.newInstance(mbusParams, netParams);
- new ConfigAgent(messagebusConfig, bus.messageBus()); // Configure the wrapped MessageBus with a routing table.
- return bus;
+ MessageBus bus = new MessageBus(net, mbusParams);
+ new ConfigAgent(messagebusConfig, bus); // Configure the wrapped MessageBus with a routing table.
+ return new SharedMessageBus(bus);
}
private static void logSystemInfo(ContainerMbusConfig containerMbusConfig, long maxPendingSize) {