diff options
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java')
-rw-r--r-- | container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java | 30 |
1 files changed, 22 insertions, 8 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java index 67badddddd2..a165a0b8c64 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java @@ -10,13 +10,19 @@ import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.service.CurrentContainer; import com.yahoo.jdisc.service.ServerProvider; + +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; -import com.yahoo.messagebus.*; + +import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.shared.ServerSession; import java.net.URI; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; /** @@ -24,8 +30,9 @@ import java.util.logging.Logger; */ public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler { + private enum State {INITIALIZING, RUNNING, STOPPED} private final static Logger log = Logger.getLogger(MbusServer.class.getName()); - private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicReference<State> runState = new AtomicReference<>(State.INITIALIZING); private final CurrentContainer container; private final ServerSession session; private final URI uri; @@ -44,26 +51,33 @@ public final class MbusServer extends AbstractResource implements ServerProvider public void start() { log.log(Level.FINE, "Starting message bus server."); session.connect(); - running.set(true); + runState.set(State.RUNNING); } @Override public void close() { log.log(Level.FINE, "Closing message bus server."); - running.set(false); + runState.set(State.STOPPED); } @Override protected void destroy() { log.log(Level.FINE, "Destroying message bus server."); - running.set(false); + runState.set(State.STOPPED); sessionReference.close(); } @Override public void handleMessage(Message msg) { - if (!running.get()) { - dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed."); + State state = runState.get(); + if (state == State.INITIALIZING) { + dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "MBusServer not started."); + return; + } + if (state == State.STOPPED) { + // We might need to detect requests originating from the same JVM, as they nede to fail fast + // As they are holding references to the container preventing proper shutdown. + dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "MBusServer has been closed."); return; } if (msg.getTrace().shouldTrace(6)) { |