diff options
author | gjoranv <gv@verizonmedia.com> | 2021-03-29 21:14:20 +0200 |
---|---|---|
committer | gjoranv <gv@verizonmedia.com> | 2021-03-29 21:14:20 +0200 |
commit | 2c7ee7bc9d3e588f11ce23d66f8c111109c8a2fc (patch) | |
tree | 148ff85b5ea211859cdc863ab24fe568060e78d0 /container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java | |
parent | 09cbb21dbfba2c80660945c84b1b2d9a0fffaf24 (diff) |
Add source code from jdisc_mbus_service into container-messagebus.
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java')
-rw-r--r-- | container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java new file mode 100644 index 00000000000..e26e1e7e134 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java @@ -0,0 +1,135 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.Inject; +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.jdisc.service.ServerProvider; +import java.util.logging.Level; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.shared.ServerSession; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler { + + private final static Logger log = Logger.getLogger(MbusServer.class.getName()); + private final AtomicBoolean running = new AtomicBoolean(false); + private final CurrentContainer container; + private final ServerSession session; + private final URI uri; + private final ResourceReference sessionReference; + + @Inject + public MbusServer(CurrentContainer container, ServerSession session) { + this.container = container; + this.session = session; + uri = URI.create("mbus://localhost/" + session.name()); + session.setMessageHandler(this); + sessionReference = session.refer(); + } + + @Override + public void start() { + log.log(Level.FINE, "Starting message bus server."); + running.set(true); + } + + @Override + public void close() { + log.log(Level.FINE, "Closing message bus server."); + running.set(false); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying message bus server."); + running.set(false); + sessionReference.close(); + } + + @Override + public void handleMessage(Message msg) { + if (!running.get()) { + dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed."); + return; + } + if (msg.getTrace().shouldTrace(6)) { + msg.getTrace().trace(6, "Message received by MbusServer."); + } + Request request = null; + ContentChannel content = null; + try { + request = new MbusRequest(container, uri, msg); + content = request.connect(new ServerResponseHandler(msg)); + } catch (RuntimeException e) { + dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString()); + } finally { + if (request != null) { + request.release(); + } + } + if (content != null) { + content.close(IgnoredCompletionHandler.INSTANCE); + } + } + + public String connectionSpec() { + return session.connectionSpec(); + } + + private void dispatchErrorReply(Message msg, int errCode, String errMsg) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(errCode, errMsg)); + session.sendReply(reply); + } + + private class ServerResponseHandler implements ResponseHandler { + + final Message msg; + + ServerResponseHandler(Message msg) { + this.msg = msg; + } + + @Override + public ContentChannel handleResponse(Response response) { + Reply reply; + if (response instanceof MbusResponse) { + reply = ((MbusResponse)response).getReply(); + } else { + reply = new EmptyReply(); + reply.swapState(msg); + } + Error err = StatusCodes.toMbusError(response.getStatus()); + if (err != null) { + if (err.isFatal()) { + if (!reply.hasFatalErrors()) { + reply.addError(err); + } + } else { + if (!reply.hasErrors()) { + reply.addError(err); + } + } + } + if (reply.getTrace().shouldTrace(6)) { + reply.getTrace().trace(6, "Sending reply from MbusServer."); + } + session.sendReply(reply); + return null; + } + } +} |