aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-11-30 13:13:27 +0100
committerjonmv <venstad@gmail.com>2023-11-30 13:13:27 +0100
commit3491100f5bee54d0918f3f9ba33d9b0b44026f55 (patch)
treed7538cd3a838ca76441559a807c1c113fc839b5b
parent7dde0ab9f6d19e25d55f1b1b4798f83b3ba6592d (diff)
Ensure mbus is not shut down while processing mbus requests
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java8
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java65
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java4
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java22
4 files changed, 79 insertions, 20 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
index 42fbec7711d..3910c5c8a43 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
@@ -28,7 +28,7 @@ import java.util.logging.Logger;
*/
public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler {
- private enum State {INITIALIZING, RUNNING, STOPPED}
+ private enum State { INITIALIZING, RUNNING, STOPPED }
private final static Logger log = Logger.getLogger(MbusServer.class.getName());
private final AtomicReference<State> runState = new AtomicReference<>(State.INITIALIZING);
private final CurrentContainer container;
@@ -91,7 +91,10 @@ public final class MbusServer extends AbstractResource implements ServerProvider
Request request = null;
ContentChannel content = null;
try {
- request = new MbusRequest(container, uri, msg);
+ request = new MbusRequest(container, uri, msg) {
+ final ResourceReference sessionReference = session.refer();
+ @Override protected void destroy() { try (sessionReference) { super.destroy(); } }
+ };
content = request.connect(new ServerResponseHandler(msg));
} catch (RuntimeException e) {
dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString());
@@ -152,4 +155,5 @@ public final class MbusServer extends AbstractResource implements ServerProvider
return null;
}
}
+
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
index 5eee34ab370..58b9ebb108e 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
@@ -2,13 +2,24 @@
package com.yahoo.messagebus.jdisc;
import com.google.inject.AbstractModule;
+import com.yahoo.jdisc.Container;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.application.BindingSetSelector;
-import com.yahoo.jdisc.handler.*;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.RequestDeniedException;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.service.CurrentContainer;
import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.jdisc.test.ServerTestDriver;
import com.yahoo.messagebus.shared.ServerSession;
import com.yahoo.messagebus.test.SimpleMessage;
@@ -23,8 +34,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* @author Simon Thoresen Hult
@@ -32,6 +47,50 @@ import static org.junit.Assert.*;
public class MbusServerTestCase {
@Test
+ public void requireThatServerRequestRetainsSession() {
+ AtomicReference<ContentChannel> responseChannel = new AtomicReference<>();
+ Container container = new Container() {
+ @Override public long currentTimeMillis() { return 0; }
+ @Override public void release() { }
+ @Override public RequestHandler resolveHandler(Request request) {
+ return new AbstractRequestHandler() {
+ @Override public ContentChannel handleRequest(Request request, ResponseHandler handler) {
+ responseChannel.set(handler.handleResponse(new Response(Response.Status.OK)));
+ return null;
+ }
+ };
+ }
+ @Override public <T> T getInstance(Class<T> type) { return null; }
+ };
+ CurrentContainer current = new CurrentContainer() {
+ @Override public Container newReference(URI uri) { return container; }
+ };
+
+ MySession session = new MySession();
+ assertEquals(1, session.refCount);
+ MbusServer server = new MbusServer(current, session);
+ assertEquals(2, session.refCount);
+
+ server.handleMessage(new SimpleMessage("too early"));
+ assertEquals(2, session.refCount);
+
+ server.start();
+ server.handleMessage(new SimpleMessage("retained"));
+ assertEquals(3, session.refCount);
+
+ session.release();
+ assertEquals(2, session.refCount);
+ server.destroy();
+ assertEquals(1, session.refCount);
+
+ server.handleMessage(new SimpleMessage("too late"));
+ assertEquals(1, session.refCount);
+
+ responseChannel.get().close(null);
+ assertEquals(0, session.refCount);
+ }
+
+ @Test
public void requireThatServerRetainsSession() {
MySession session = new MySession();
assertEquals(1, session.refCount);
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index 4a443a9fde5..b4e57876bc2 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -267,10 +267,10 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler,
* values for the {@link IntermediateSessionParams} object.</p>
*
* @param name The local unique name for the created session.
- * @param broadcastName Whether or not to broadcast this session's name on
+ * @param broadcastName Whether to broadcast this session's name on
* the network.
* @param msgHandler The handler to receive the messages for the session.
- * @param replyHandler The handler to received the replies for the session.
+ * @param replyHandler The handler to receive the replies for the session.
* @return The created session.
*/
public IntermediateSession createIntermediateSession(String name,
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index 44f29df0e91..beef238288a 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -87,19 +87,15 @@ public class LocalNetwork implements Network {
private void receiveLater(MessageEnvelope envelope) {
byte[] payload = envelope.sender.encode(envelope.msg.getProtocol(), envelope.msg);
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- Message msg = decode(envelope.msg.getProtocol(), payload, Message.class);
- msg.getTrace().setLevel(envelope.msg.getTrace().getLevel());
- msg.setRoute(envelope.msg.getRoute()).getRoute().removeHop(0);
- msg.setRetryEnabled(envelope.msg.getRetryEnabled());
- msg.setRetry(envelope.msg.getRetry());
- msg.setTimeRemaining(envelope.msg.getTimeRemainingNow());
- msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send());
- owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName());
- }
+ executor.execute(() -> {
+ Message msg = decode(envelope.msg.getProtocol(), payload, Message.class);
+ msg.getTrace().setLevel(envelope.msg.getTrace().getLevel());
+ msg.setRoute(envelope.msg.getRoute()).getRoute().removeHop(0);
+ msg.setRetryEnabled(envelope.msg.getRetryEnabled());
+ msg.setRetry(envelope.msg.getRetry());
+ msg.setTimeRemaining(envelope.msg.getTimeRemainingNow());
+ msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send());
+ owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName());
});
}