aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
diff options
context:
space:
mode:
authorgjoranv <gv@verizonmedia.com>2021-03-29 21:14:20 +0200
committergjoranv <gv@verizonmedia.com>2021-03-29 21:14:20 +0200
commit2c7ee7bc9d3e588f11ce23d66f8c111109c8a2fc (patch)
tree148ff85b5ea211859cdc863ab24fe568060e78d0 /container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
parent09cbb21dbfba2c80660945c84b1b2d9a0fffaf24 (diff)
Add source code from jdisc_mbus_service into container-messagebus.
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java135
1 files changed, 135 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
new file mode 100644
index 00000000000..e26e1e7e134
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
@@ -0,0 +1,135 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.google.inject.Inject;
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.service.CurrentContainer;
+import com.yahoo.jdisc.service.ServerProvider;
+import java.util.logging.Level;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.shared.ServerSession;
+
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler {
+
+ private final static Logger log = Logger.getLogger(MbusServer.class.getName());
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final CurrentContainer container;
+ private final ServerSession session;
+ private final URI uri;
+ private final ResourceReference sessionReference;
+
+ @Inject
+ public MbusServer(CurrentContainer container, ServerSession session) {
+ this.container = container;
+ this.session = session;
+ uri = URI.create("mbus://localhost/" + session.name());
+ session.setMessageHandler(this);
+ sessionReference = session.refer();
+ }
+
+ @Override
+ public void start() {
+ log.log(Level.FINE, "Starting message bus server.");
+ running.set(true);
+ }
+
+ @Override
+ public void close() {
+ log.log(Level.FINE, "Closing message bus server.");
+ running.set(false);
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying message bus server.");
+ running.set(false);
+ sessionReference.close();
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ if (!running.get()) {
+ dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed.");
+ return;
+ }
+ if (msg.getTrace().shouldTrace(6)) {
+ msg.getTrace().trace(6, "Message received by MbusServer.");
+ }
+ Request request = null;
+ ContentChannel content = null;
+ try {
+ request = new MbusRequest(container, uri, msg);
+ content = request.connect(new ServerResponseHandler(msg));
+ } catch (RuntimeException e) {
+ dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString());
+ } finally {
+ if (request != null) {
+ request.release();
+ }
+ }
+ if (content != null) {
+ content.close(IgnoredCompletionHandler.INSTANCE);
+ }
+ }
+
+ public String connectionSpec() {
+ return session.connectionSpec();
+ }
+
+ private void dispatchErrorReply(Message msg, int errCode, String errMsg) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(errCode, errMsg));
+ session.sendReply(reply);
+ }
+
+ private class ServerResponseHandler implements ResponseHandler {
+
+ final Message msg;
+
+ ServerResponseHandler(Message msg) {
+ this.msg = msg;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ Reply reply;
+ if (response instanceof MbusResponse) {
+ reply = ((MbusResponse)response).getReply();
+ } else {
+ reply = new EmptyReply();
+ reply.swapState(msg);
+ }
+ Error err = StatusCodes.toMbusError(response.getStatus());
+ if (err != null) {
+ if (err.isFatal()) {
+ if (!reply.hasFatalErrors()) {
+ reply.addError(err);
+ }
+ } else {
+ if (!reply.hasErrors()) {
+ reply.addError(err);
+ }
+ }
+ }
+ if (reply.getTrace().shouldTrace(6)) {
+ reply.getTrace().trace(6, "Sending reply from MbusServer.");
+ }
+ session.sendReply(reply);
+ return null;
+ }
+ }
+}