diff options
author | jonmv <venstad@gmail.com> | 2023-11-30 13:13:27 +0100 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-11-30 13:13:27 +0100 |
commit | 3491100f5bee54d0918f3f9ba33d9b0b44026f55 (patch) | |
tree | d7538cd3a838ca76441559a807c1c113fc839b5b | |
parent | 7dde0ab9f6d19e25d55f1b1b4798f83b3ba6592d (diff) |
Ensure mbus is not shut down while processing mbus requests
4 files changed, 79 insertions, 20 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java index 42fbec7711d..3910c5c8a43 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java @@ -28,7 +28,7 @@ import java.util.logging.Logger; */ public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler { - private enum State {INITIALIZING, RUNNING, STOPPED} + private enum State { INITIALIZING, RUNNING, STOPPED } private final static Logger log = Logger.getLogger(MbusServer.class.getName()); private final AtomicReference<State> runState = new AtomicReference<>(State.INITIALIZING); private final CurrentContainer container; @@ -91,7 +91,10 @@ public final class MbusServer extends AbstractResource implements ServerProvider Request request = null; ContentChannel content = null; try { - request = new MbusRequest(container, uri, msg); + request = new MbusRequest(container, uri, msg) { + final ResourceReference sessionReference = session.refer(); + @Override protected void destroy() { try (sessionReference) { super.destroy(); } } + }; content = request.connect(new ServerResponseHandler(msg)); } catch (RuntimeException e) { dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString()); @@ -152,4 +155,5 @@ public final class MbusServer extends AbstractResource implements ServerProvider return null; } } + } diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java index 5eee34ab370..58b9ebb108e 100644 --- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java @@ -2,13 +2,24 @@ package com.yahoo.messagebus.jdisc; import com.google.inject.AbstractModule; +import com.yahoo.jdisc.Container; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.Response; import com.yahoo.jdisc.application.BindingSetSelector; -import com.yahoo.jdisc.handler.*; +import com.yahoo.jdisc.handler.AbstractRequestHandler; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDeniedException; +import com.yahoo.jdisc.handler.RequestHandler; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.service.CurrentContainer; import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.jdisc.test.ServerTestDriver; import com.yahoo.messagebus.shared.ServerSession; import com.yahoo.messagebus.test.SimpleMessage; @@ -23,8 +34,12 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @author Simon Thoresen Hult @@ -32,6 +47,50 @@ import static org.junit.Assert.*; public class MbusServerTestCase { @Test + public void requireThatServerRequestRetainsSession() { + AtomicReference<ContentChannel> responseChannel = new AtomicReference<>(); + Container container = new Container() { + @Override public long currentTimeMillis() { return 0; } + @Override public void release() { } + @Override public RequestHandler resolveHandler(Request request) { + return new AbstractRequestHandler() { + @Override public ContentChannel handleRequest(Request request, ResponseHandler handler) { + responseChannel.set(handler.handleResponse(new Response(Response.Status.OK))); + return null; + } + }; + } + @Override public <T> T getInstance(Class<T> type) { return null; } + }; + CurrentContainer current = new CurrentContainer() { + @Override public Container newReference(URI uri) { return container; } + }; + + MySession session = new MySession(); + assertEquals(1, session.refCount); + MbusServer server = new MbusServer(current, session); + assertEquals(2, session.refCount); + + server.handleMessage(new SimpleMessage("too early")); + assertEquals(2, session.refCount); + + server.start(); + server.handleMessage(new SimpleMessage("retained")); + assertEquals(3, session.refCount); + + session.release(); + assertEquals(2, session.refCount); + server.destroy(); + assertEquals(1, session.refCount); + + server.handleMessage(new SimpleMessage("too late")); + assertEquals(1, session.refCount); + + responseChannel.get().close(null); + assertEquals(0, session.refCount); + } + + @Test public void requireThatServerRetainsSession() { MySession session = new MySession(); assertEquals(1, session.refCount); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 4a443a9fde5..b4e57876bc2 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -267,10 +267,10 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, * values for the {@link IntermediateSessionParams} object.</p> * * @param name The local unique name for the created session. - * @param broadcastName Whether or not to broadcast this session's name on + * @param broadcastName Whether to broadcast this session's name on * the network. * @param msgHandler The handler to receive the messages for the session. - * @param replyHandler The handler to received the replies for the session. + * @param replyHandler The handler to receive the replies for the session. * @return The created session. */ public IntermediateSession createIntermediateSession(String name, diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index 44f29df0e91..beef238288a 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -87,19 +87,15 @@ public class LocalNetwork implements Network { private void receiveLater(MessageEnvelope envelope) { byte[] payload = envelope.sender.encode(envelope.msg.getProtocol(), envelope.msg); - executor.execute(new Runnable() { - - @Override - public void run() { - Message msg = decode(envelope.msg.getProtocol(), payload, Message.class); - msg.getTrace().setLevel(envelope.msg.getTrace().getLevel()); - msg.setRoute(envelope.msg.getRoute()).getRoute().removeHop(0); - msg.setRetryEnabled(envelope.msg.getRetryEnabled()); - msg.setRetry(envelope.msg.getRetry()); - msg.setTimeRemaining(envelope.msg.getTimeRemainingNow()); - msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send()); - owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName()); - } + executor.execute(() -> { + Message msg = decode(envelope.msg.getProtocol(), payload, Message.class); + msg.getTrace().setLevel(envelope.msg.getTrace().getLevel()); + msg.setRoute(envelope.msg.getRoute()).getRoute().removeHop(0); + msg.setRetryEnabled(envelope.msg.getRetryEnabled()); + msg.setRetry(envelope.msg.getRetry()); + msg.setTimeRemaining(envelope.msg.getTimeRemainingNow()); + msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send()); + owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName()); }); } |