From 8fd1e2515449912e4594e09184357df67048b274 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 24 Sep 2021 09:46:53 +0200 Subject: Stopping is server is a one way street. Returning BUSY as a temporary error will cause retries, instead of fast-fail. --- .../com/yahoo/messagebus/jdisc/MbusClient.java | 6 ++++- .../com/yahoo/messagebus/jdisc/MbusServer.java | 28 +++++++++++++++------- .../com/yahoo/messagebus/shared/ClientSession.java | 2 +- .../jdisc/MbusServerConformanceTest.java | 3 ++- .../com/yahoo/jdisc/service/ClientProvider.java | 6 ++++- .../com/yahoo/jdisc/service/ServerProvider.java | 4 ++-- 6 files changed, 35 insertions(+), 14 deletions(-) diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java index 922e4140868..009d1619503 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java @@ -9,6 +9,8 @@ import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.jdisc.handler.RequestDeniedException; import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.service.ClientProvider; + +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; @@ -29,9 +31,10 @@ import java.util.logging.Logger; public final class MbusClient extends AbstractResource implements ClientProvider, ReplyHandler { private static final Logger log = Logger.getLogger(MbusClient.class.getName()); + private static final AtomicInteger threadId = new AtomicInteger(0); private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final ClientSession session; - private final Thread thread = new Thread(new SenderTask(), "MbusClient"); + private final Thread thread; private volatile boolean done = false; private final ResourceReference sessionReference; @@ -39,6 +42,7 @@ public final class MbusClient extends AbstractResource implements ClientProvider public MbusClient(ClientSession session) { this.session = session; this.sessionReference = session.refer(); + thread = new Thread(new SenderTask(), "mbus-client-" + threadId.getAndIncrement()); } @Override diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java index 67badddddd2..f8e50845bc8 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java @@ -10,13 +10,19 @@ import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.service.CurrentContainer; import com.yahoo.jdisc.service.ServerProvider; + +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; -import com.yahoo.messagebus.*; + +import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.shared.ServerSession; import java.net.URI; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; /** @@ -24,8 +30,9 @@ import java.util.logging.Logger; */ public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler { + private enum State {INITIALIZING, RUNNING, STOPPED} private final static Logger log = Logger.getLogger(MbusServer.class.getName()); - private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicReference runState = new AtomicReference<>(State.INITIALIZING); private final CurrentContainer container; private final ServerSession session; private final URI uri; @@ -44,26 +51,31 @@ public final class MbusServer extends AbstractResource implements ServerProvider public void start() { log.log(Level.FINE, "Starting message bus server."); session.connect(); - running.set(true); + runState.set(State.RUNNING); } @Override public void close() { log.log(Level.FINE, "Closing message bus server."); - running.set(false); + runState.set(State.STOPPED); } @Override protected void destroy() { log.log(Level.FINE, "Destroying message bus server."); - running.set(false); + runState.set(State.STOPPED); sessionReference.close(); } @Override public void handleMessage(Message msg) { - if (!running.get()) { - dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed."); + State state = runState.get(); + if (state == State.INITIALIZING) { + dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "MBusServer not started."); + return; + } + if (state == State.STOPPED) { + dispatchErrorReply(msg, ErrorCode.SEND_QUEUE_CLOSED, "MBusServer has been closed."); return; } if (msg.getTrace().shouldTrace(6)) { diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java index 0964a254cf2..22322a49a78 100644 --- a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java @@ -10,5 +10,5 @@ import com.yahoo.messagebus.Result; */ public interface ClientSession extends SharedResource { - public Result sendMessage(Message msg); + Result sendMessage(Message msg); } diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java index bf89f3869ed..2bdc15002ad 100644 --- a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import static com.yahoo.messagebus.ErrorCode.APP_FATAL_ERROR; +import static com.yahoo.messagebus.ErrorCode.SEND_QUEUE_CLOSED; import static com.yahoo.messagebus.ErrorCode.SESSION_BUSY; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -50,7 +51,7 @@ public class MbusServerConformanceTest extends ServerProviderConformanceTest { @Test public void testContainerNotReadyException() throws Throwable { new TestRunner().setRequestTimeout(100, TimeUnit.MILLISECONDS) - .expectError(is(SESSION_BUSY)) + .expectError(is(SEND_QUEUE_CLOSED)) .executeAndClose(); } diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/service/ClientProvider.java b/jdisc_core/src/main/java/com/yahoo/jdisc/service/ClientProvider.java index c178147a952..a47c6b06321 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/service/ClientProvider.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/service/ClientProvider.java @@ -2,7 +2,11 @@ package com.yahoo.jdisc.service; import com.yahoo.jdisc.Container; -import com.yahoo.jdisc.application.*; +import com.yahoo.jdisc.application.BindingRepository; +import com.yahoo.jdisc.application.ContainerActivator; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.application.Application; +import com.yahoo.jdisc.application.UriPattern; import com.yahoo.jdisc.handler.RequestHandler; /** diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/service/ServerProvider.java b/jdisc_core/src/main/java/com/yahoo/jdisc/service/ServerProvider.java index b58f3bc5138..3b5cbfd9cbc 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/service/ServerProvider.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/service/ServerProvider.java @@ -37,7 +37,7 @@ public interface ServerProvider extends SharedResource { * {@link Container} does not call this method, instead it is a required step in the {@link Application} * initialization code.

*/ - public void start(); + void start(); /** *

This is a synchronous method to close the listen port (or equivalent) of this ServerProvider and flush any @@ -48,5 +48,5 @@ public interface ServerProvider extends SharedResource { *

The {@link Container} does not call this method, instead it is a required step in the {@link * Application} shutdown code.

*/ - public void close(); + void close(); } -- cgit v1.2.3