diff options
Diffstat (limited to 'jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java')
-rw-r--r-- | jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java | 60 |
1 files changed, 60 insertions, 0 deletions
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java new file mode 100644 index 00000000000..2ce76a50b73 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java @@ -0,0 +1,60 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; + +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler { + + private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName()); + private final SharedMessageBus mbus; + private final SourceSession session; + private final ResourceReference mbusReference; + + public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.mbus = mbus; + this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this)); + this.mbusReference = mbus.refer(); + } + + public SourceSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + return session.send(msg); + } + + public Result sendMessageBlocking(Message msg) throws InterruptedException { + return session.sendBlocking(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + protected void destroy() { + log.log(LogLevel.DEBUG, "Destroying shared source session."); + session.close(); + mbusReference.close(); + } + +} |