diff options
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java')
-rw-r--r-- | container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java new file mode 100644 index 00000000000..5c9fab46e34 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java @@ -0,0 +1,104 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import java.util.logging.Level; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.IntermediateSession; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedIntermediateSession extends AbstractResource + implements ClientSession, ServerSession, MessageHandler, ReplyHandler +{ + + private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName()); + private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); + private final IntermediateSession session; + private final ResourceReference mbusReference; + + public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.msgHandler.set(params.getMessageHandler()); + this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this) + .setMessageHandler(this)); + this.mbusReference = mbus.refer(); + } + + public IntermediateSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + session.forward(msg); + return Result.ACCEPTED; + } + + @Override + public void sendReply(Reply reply) { + session.forward(reply); + } + + @Override + public MessageHandler getMessageHandler() { + return msgHandler.get(); + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + if (!this.msgHandler.compareAndSet(null, msgHandler)) { + throw new IllegalStateException("Message handler already registered."); + } + } + + @Override + public void handleMessage(Message msg) { + MessageHandler msgHandler = this.msgHandler.get(); + if (msgHandler == null) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); + sendReply(reply); + return; + } + msgHandler.handleMessage(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + public String connectionSpec() { + return session.getConnectionSpec(); + } + + @Override + public String name() { + return session.getName(); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared intermediate session."); + session.destroy(); + mbusReference.close(); + } +} |