diff options
Diffstat (limited to 'jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java')
-rw-r--r-- | jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java new file mode 100644 index 00000000000..0c90f602e44 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java @@ -0,0 +1,98 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SharedIntermediateSession extends AbstractResource + implements ClientSession, ServerSession, MessageHandler, ReplyHandler +{ + + private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName()); + private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); + private final SharedMessageBus mbus; + private final IntermediateSession session; + private final ResourceReference mbusReference; + + public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.mbus = mbus; + this.msgHandler.set(params.getMessageHandler()); + this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this) + .setMessageHandler(this)); + this.mbusReference = mbus.refer(); + } + + public IntermediateSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + session.forward(msg); + return Result.ACCEPTED; + } + + @Override + public void sendReply(Reply reply) { + session.forward(reply); + } + + @Override + public MessageHandler getMessageHandler() { + return msgHandler.get(); + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + if (!this.msgHandler.compareAndSet(null, msgHandler)) { + throw new IllegalStateException("Message handler already registered."); + } + } + + @Override + public void handleMessage(Message msg) { + MessageHandler msgHandler = this.msgHandler.get(); + if (msgHandler == null) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); + sendReply(reply); + return; + } + msgHandler.handleMessage(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + public String connectionSpec() { + return session.getConnectionSpec(); + } + + @Override + public String name() { + return session.getName(); + } + + @Override + protected void destroy() { + log.log(LogLevel.DEBUG, "Destroying shared intermediate session."); + session.destroy(); + mbusReference.close(); + } +} |