summaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-09-24 09:46:53 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2021-09-24 09:47:17 +0200
commit8fd1e2515449912e4594e09184357df67048b274 (patch)
tree74ff332edbe6d1d50a32c024bfb903ae4568f96a /container-messagebus
parent8b5c2f0197ec501e97a2b31dfea2e0fe3ac1ef85 (diff)
Stopping is server is a one way street.
Returning BUSY as a temporary error will cause retries, instead of fast-fail.
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java6
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java28
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java2
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java3
4 files changed, 28 insertions, 11 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
index 922e4140868..009d1619503 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
@@ -9,6 +9,8 @@ import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.RequestDeniedException;
import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.service.ClientProvider;
+
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
@@ -29,9 +31,10 @@ import java.util.logging.Logger;
public final class MbusClient extends AbstractResource implements ClientProvider, ReplyHandler {
private static final Logger log = Logger.getLogger(MbusClient.class.getName());
+ private static final AtomicInteger threadId = new AtomicInteger(0);
private final BlockingQueue<MbusRequest> queue = new LinkedBlockingQueue<>();
private final ClientSession session;
- private final Thread thread = new Thread(new SenderTask(), "MbusClient");
+ private final Thread thread;
private volatile boolean done = false;
private final ResourceReference sessionReference;
@@ -39,6 +42,7 @@ public final class MbusClient extends AbstractResource implements ClientProvider
public MbusClient(ClientSession session) {
this.session = session;
this.sessionReference = session.refer();
+ thread = new Thread(new SenderTask(), "mbus-client-" + threadId.getAndIncrement());
}
@Override
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
index 67badddddd2..f8e50845bc8 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
@@ -10,13 +10,19 @@ import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.service.CurrentContainer;
import com.yahoo.jdisc.service.ServerProvider;
+
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
-import com.yahoo.messagebus.*;
+
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.shared.ServerSession;
import java.net.URI;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
/**
@@ -24,8 +30,9 @@ import java.util.logging.Logger;
*/
public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler {
+ private enum State {INITIALIZING, RUNNING, STOPPED}
private final static Logger log = Logger.getLogger(MbusServer.class.getName());
- private final AtomicBoolean running = new AtomicBoolean(false);
+ private final AtomicReference<State> runState = new AtomicReference<>(State.INITIALIZING);
private final CurrentContainer container;
private final ServerSession session;
private final URI uri;
@@ -44,26 +51,31 @@ public final class MbusServer extends AbstractResource implements ServerProvider
public void start() {
log.log(Level.FINE, "Starting message bus server.");
session.connect();
- running.set(true);
+ runState.set(State.RUNNING);
}
@Override
public void close() {
log.log(Level.FINE, "Closing message bus server.");
- running.set(false);
+ runState.set(State.STOPPED);
}
@Override
protected void destroy() {
log.log(Level.FINE, "Destroying message bus server.");
- running.set(false);
+ runState.set(State.STOPPED);
sessionReference.close();
}
@Override
public void handleMessage(Message msg) {
- if (!running.get()) {
- dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed.");
+ State state = runState.get();
+ if (state == State.INITIALIZING) {
+ dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "MBusServer not started.");
+ return;
+ }
+ if (state == State.STOPPED) {
+ dispatchErrorReply(msg, ErrorCode.SEND_QUEUE_CLOSED, "MBusServer has been closed.");
return;
}
if (msg.getTrace().shouldTrace(6)) {
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
index 0964a254cf2..22322a49a78 100644
--- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
@@ -10,5 +10,5 @@ import com.yahoo.messagebus.Result;
*/
public interface ClientSession extends SharedResource {
- public Result sendMessage(Message msg);
+ Result sendMessage(Message msg);
}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
index bf89f3869ed..2bdc15002ad 100644
--- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import static com.yahoo.messagebus.ErrorCode.APP_FATAL_ERROR;
+import static com.yahoo.messagebus.ErrorCode.SEND_QUEUE_CLOSED;
import static com.yahoo.messagebus.ErrorCode.SESSION_BUSY;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -50,7 +51,7 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest {
@Test
public void testContainerNotReadyException() throws Throwable {
new TestRunner().setRequestTimeout(100, TimeUnit.MILLISECONDS)
- .expectError(is(SESSION_BUSY))
+ .expectError(is(SEND_QUEUE_CLOSED))
.executeAndClose();
}