From 2c7ee7bc9d3e588f11ce23d66f8c111109c8a2fc Mon Sep 17 00:00:00 2001 From: gjoranv Date: Mon, 29 Mar 2021 21:14:20 +0200 Subject: Add source code from jdisc_mbus_service into container-messagebus. --- .../yahoo/messagebus/shared/SharedMessageBus.java | 68 ++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java') diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java new file mode 100644 index 00000000000..dd135a51378 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java @@ -0,0 +1,68 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.config.subscription.ConfigGetter; +import com.yahoo.jdisc.AbstractResource; +import java.util.logging.Level; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.cloud.config.SlobroksConfig; + +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedMessageBus extends AbstractResource { + + private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName()); + private final MessageBus mbus; + + public SharedMessageBus(MessageBus mbus) { + mbus.getClass(); // throws NullPointerException + this.mbus = mbus; + } + + public MessageBus messageBus() { + return mbus; + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared message bus."); + mbus.destroy(); + } + + public SharedSourceSession newSourceSession(SourceSessionParams params) { + return new SharedSourceSession(this, params); + } + + public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) { + return new SharedIntermediateSession(this, params); + } + + public SharedDestinationSession newDestinationSession(DestinationSessionParams params) { + return new SharedDestinationSession(this, params); + } + + public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) { + return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams)); + } + + private static Network newNetwork(RPCNetworkParams params) { + SlobroksConfig cfg = params.getSlobroksConfig(); + if (cfg == null) { + cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId()); + } + if (cfg.slobrok().isEmpty()) { + return new NullNetwork(); // for LocalApplication + } + return new RPCNetwork(params); + } +} -- cgit v1.2.3