diff options
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/shared')
8 files changed, 432 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java new file mode 100644 index 00000000000..0964a254cf2 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java @@ -0,0 +1,14 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.SharedResource; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Result; + +/** + * @author Simon Thoresen Hult + */ +public interface ClientSession extends SharedResource { + + public Result sendMessage(Message msg); +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java new file mode 100644 index 00000000000..ad58d6b9a5e --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java @@ -0,0 +1,73 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.NetworkOwner; +import com.yahoo.messagebus.routing.RoutingNode; + +import java.util.List; + +/** + * <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p> + * + * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a> + */ +class NullNetwork implements Network { + + @Override + public boolean waitUntilReady(double seconds) { + return true; + } + + @Override + public void attach(NetworkOwner owner) { + + } + + @Override + public void registerSession(String session) { + + } + + @Override + public void unregisterSession(String session) { + + } + + @Override + public boolean allocServiceAddress(RoutingNode recipient) { + return false; + } + + @Override + public void freeServiceAddress(RoutingNode recipient) { + + } + + @Override + public void send(Message msg, List<RoutingNode> recipients) { + + } + + @Override + public void sync() { + + } + + @Override + public void shutdown() { + + } + + @Override + public String getConnectionSpec() { + return null; + } + + @Override + public IMirror getMirror() { + return null; + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java new file mode 100644 index 00000000000..56713815c7a --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java @@ -0,0 +1,22 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.SharedResource; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; + +/** + * @author Simon Thoresen Hult + */ +public interface ServerSession extends SharedResource { + + public MessageHandler getMessageHandler(); + + public void setMessageHandler(MessageHandler msgHandler); + + public void sendReply(Reply reply); + + public String connectionSpec(); + + public String name(); +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java new file mode 100644 index 00000000000..7da164757cd --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java @@ -0,0 +1,85 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import java.util.logging.Level; +import com.yahoo.messagebus.DestinationSession; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedDestinationSession extends AbstractResource implements MessageHandler, ServerSession { + + private static final Logger log = Logger.getLogger(SharedDestinationSession.class.getName()); + private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); + private final DestinationSession session; + private final ResourceReference mbusReference; + + SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) { + this.msgHandler.set(params.getMessageHandler()); + this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this)); + this.mbusReference = mbus.refer(); + } + + public DestinationSession session() { + return session; + } + + @Override + public void sendReply(Reply reply) { + session.reply(reply); + } + + @Override + public MessageHandler getMessageHandler() { + return msgHandler.get(); + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + if (!this.msgHandler.compareAndSet(null, msgHandler)) { + throw new IllegalStateException("Message handler already registered."); + } + } + + @Override + public void handleMessage(Message msg) { + MessageHandler msgHandler = this.msgHandler.get(); + if (msgHandler == null) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); + sendReply(reply); + return; + } + msgHandler.handleMessage(msg); + } + + @Override + public String connectionSpec() { + return session.getConnectionSpec(); + } + + @Override + public String name() { + return session.getName(); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared destination session."); + session.destroy(); + mbusReference.close(); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java new file mode 100644 index 00000000000..5c9fab46e34 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java @@ -0,0 +1,104 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import java.util.logging.Level; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.IntermediateSession; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedIntermediateSession extends AbstractResource + implements ClientSession, ServerSession, MessageHandler, ReplyHandler +{ + + private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName()); + private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); + private final IntermediateSession session; + private final ResourceReference mbusReference; + + public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.msgHandler.set(params.getMessageHandler()); + this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this) + .setMessageHandler(this)); + this.mbusReference = mbus.refer(); + } + + public IntermediateSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + session.forward(msg); + return Result.ACCEPTED; + } + + @Override + public void sendReply(Reply reply) { + session.forward(reply); + } + + @Override + public MessageHandler getMessageHandler() { + return msgHandler.get(); + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + if (!this.msgHandler.compareAndSet(null, msgHandler)) { + throw new IllegalStateException("Message handler already registered."); + } + } + + @Override + public void handleMessage(Message msg) { + MessageHandler msgHandler = this.msgHandler.get(); + if (msgHandler == null) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); + sendReply(reply); + return; + } + msgHandler.handleMessage(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + public String connectionSpec() { + return session.getConnectionSpec(); + } + + @Override + public String name() { + return session.getName(); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared intermediate session."); + session.destroy(); + mbusReference.close(); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java new file mode 100644 index 00000000000..dd135a51378 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java @@ -0,0 +1,68 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.config.subscription.ConfigGetter; +import com.yahoo.jdisc.AbstractResource; +import java.util.logging.Level; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.cloud.config.SlobroksConfig; + +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedMessageBus extends AbstractResource { + + private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName()); + private final MessageBus mbus; + + public SharedMessageBus(MessageBus mbus) { + mbus.getClass(); // throws NullPointerException + this.mbus = mbus; + } + + public MessageBus messageBus() { + return mbus; + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared message bus."); + mbus.destroy(); + } + + public SharedSourceSession newSourceSession(SourceSessionParams params) { + return new SharedSourceSession(this, params); + } + + public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) { + return new SharedIntermediateSession(this, params); + } + + public SharedDestinationSession newDestinationSession(DestinationSessionParams params) { + return new SharedDestinationSession(this, params); + } + + public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) { + return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams)); + } + + private static Network newNetwork(RPCNetworkParams params) { + SlobroksConfig cfg = params.getSlobroksConfig(); + if (cfg == null) { + cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId()); + } + if (cfg.slobrok().isEmpty()) { + return new NullNetwork(); // for LocalApplication + } + return new RPCNetwork(params); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java new file mode 100644 index 00000000000..56071682349 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java @@ -0,0 +1,58 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import java.util.logging.Level; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; + +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler { + + private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName()); + private final SourceSession session; + private final ResourceReference mbusReference; + + public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this)); + this.mbusReference = mbus.refer(); + } + + public SourceSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + return session.send(msg); + } + + public Result sendMessageBlocking(Message msg) throws InterruptedException { + return session.sendBlocking(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared source session."); + session.close(); + mbusReference.close(); + } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java new file mode 100644 index 00000000000..941a0dc4c5c --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java @@ -0,0 +1,8 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Not a public API, exported for use in internal components. + */ +@ExportPackage +package com.yahoo.messagebus.shared; + +import com.yahoo.osgi.annotation.ExportPackage; |