diff options
author | gjoranv <gv@verizonmedia.com> | 2021-03-29 21:14:20 +0200 |
---|---|---|
committer | gjoranv <gv@verizonmedia.com> | 2021-03-29 21:14:20 +0200 |
commit | 2c7ee7bc9d3e588f11ce23d66f8c111109c8a2fc (patch) | |
tree | 148ff85b5ea211859cdc863ab24fe568060e78d0 /container-messagebus/src | |
parent | 09cbb21dbfba2c80660945c84b1b2d9a0fffaf24 (diff) |
Add source code from jdisc_mbus_service into container-messagebus.
Diffstat (limited to 'container-messagebus/src')
41 files changed, 3914 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java new file mode 100644 index 00000000000..c64fea8653b --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java @@ -0,0 +1,22 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.handler.CompletionHandler; + +/** + * @author Simon Thoresen Hult + */ +enum IgnoredCompletionHandler implements CompletionHandler { + + INSTANCE; + + @Override + public void completed() { + + } + + @Override + public void failed(final Throwable t) { + + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java new file mode 100644 index 00000000000..922e4140868 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java @@ -0,0 +1,147 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.Inject; +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDeniedException; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.service.ClientProvider; +import java.util.logging.Level; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.shared.ClientSession; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public final class MbusClient extends AbstractResource implements ClientProvider, ReplyHandler { + + private static final Logger log = Logger.getLogger(MbusClient.class.getName()); + private final BlockingQueue<MbusRequest> queue = new LinkedBlockingQueue<>(); + private final ClientSession session; + private final Thread thread = new Thread(new SenderTask(), "MbusClient"); + private volatile boolean done = false; + private final ResourceReference sessionReference; + + @Inject + public MbusClient(ClientSession session) { + this.session = session; + this.sessionReference = session.refer(); + } + + @Override + public void start() { + thread.start(); + } + + @Override + public ContentChannel handleRequest(Request request, ResponseHandler handler) { + if (!(request instanceof MbusRequest)) { + throw new RequestDeniedException(request); + } + final Message msg = ((MbusRequest)request).getMessage(); + msg.getTrace().trace(6, "Request received by MbusClient."); + msg.pushHandler(null); // save user context + final Long timeout = request.timeRemaining(TimeUnit.MILLISECONDS); + if (timeout != null) { + msg.setTimeReceivedNow(); + msg.setTimeRemaining(Math.max(1, timeout)); // negative or zero timeout has semantics + } + msg.setContext(handler); + msg.pushHandler(this); + sendBlocking((MbusRequest)request); + return null; + } + + @Override + public void handleTimeout(Request request, final ResponseHandler handler) { + // ignore, mbus has guaranteed reply + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying message bus client."); + sessionReference.close(); + done = true; + } + + @Override + public void handleReply(final Reply reply) { + reply.getTrace().trace(6, "Reply received by MbusClient."); + final ResponseHandler handler = (ResponseHandler)reply.getContext(); + reply.popHandler(); // restore user context + try { + handler.handleResponse(new MbusResponse(StatusCodes.fromMbusReply(reply), reply)) + .close(IgnoredCompletionHandler.INSTANCE); + } catch (final Exception e) { + log.log(Level.WARNING, "Ignoring exception thrown by ResponseHandler.", e); + } + } + + private void sendBlocking(MbusRequest request) { + while (!sendMessage(request)) { + try { + Thread.sleep(5); + } catch (final InterruptedException e) { + // ignore + } + } + } + + private boolean sendMessage(MbusRequest request) { + Error error; + final Long millis = request.timeRemaining(TimeUnit.MILLISECONDS); + if (millis != null && millis <= 0) { + error = new Error(ErrorCode.TIMEOUT, request.getTimeout(TimeUnit.MILLISECONDS) + " millis"); + } else if (request.isCancelled()) { + error = new Error(ErrorCode.APP_FATAL_ERROR, "request cancelled"); + } else { + try { + error = session.sendMessage(request.getMessage()).getError(); + } catch (final Exception e) { + error = new Error(ErrorCode.FATAL_ERROR, e.toString()); + } + } + if (error == null) { + return true; + } + if (error.isFatal()) { + final Reply reply = new EmptyReply(); + reply.swapState(request.getMessage()); + reply.addError(error); + reply.popHandler().handleReply(reply); + return true; + } + return false; + } + + private class SenderTask implements Runnable { + + @Override + public void run() { + while (!done) { + try { + final MbusRequest request = queue.poll(100, TimeUnit.MILLISECONDS); + if (request == null) { + continue; + } + sendBlocking(request); + } catch (final Exception e) { + log.log(Level.WARNING, "Ignoring exception thrown by MbusClient.", e); + } + } + } + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java new file mode 100644 index 00000000000..a0bedd678eb --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java @@ -0,0 +1,38 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.messagebus.Message; + +import java.net.URI; + +/** + * @author Simon Thoresen Hult + */ +public class MbusRequest extends Request { + + private final Message message; + + public MbusRequest(CurrentContainer current, URI uri, Message msg) { + super(current, uri); + this.message = validateMessage(msg); + } + + public MbusRequest(Request parent, URI uri, Message msg) { + super(parent, uri); + this.message = validateMessage(msg); + } + + public Message getMessage() { + return message; + } + + private Message validateMessage(Message msg) { + if (msg != null) { + return msg; + } + release(); + throw new NullPointerException(); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java new file mode 100644 index 00000000000..fb5657a9215 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java @@ -0,0 +1,59 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.handler.AbstractRequestHandler; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; + +/** + * @author Simon Thoresen Hult + */ +public abstract class MbusRequestHandler extends AbstractRequestHandler implements MessageHandler { + + @Override + public ContentChannel handleRequest(final Request request, final ResponseHandler handler) { + if (!(request instanceof MbusRequest)) { + throw new UnsupportedOperationException("Expected MbusRequest, got " + request.getClass().getName() + "."); + } + final Message msg = ((MbusRequest)request).getMessage(); + msg.pushHandler(new RespondingReplyHandler(handler)); + handleMessage(msg); + return null; + } + + private static class RespondingReplyHandler implements ReplyHandler { + + private final ResponseHandler handler; + + RespondingReplyHandler(final ResponseHandler handler) { + this.handler = handler; + } + + @Override + public void handleReply(final Reply reply) { + final MbusResponse response = new MbusResponse(StatusCodes.fromMbusReply(reply), reply); + handler.handleResponse(response).close(IgnoringCompletionHandler.INSTANCE); + } + } + + private static class IgnoringCompletionHandler implements CompletionHandler { + + public static final IgnoringCompletionHandler INSTANCE = new IgnoringCompletionHandler(); + + @Override + public void completed() { + + } + + @Override + public void failed(final Throwable t) { + + } + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java new file mode 100644 index 00000000000..37da4d8569f --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java @@ -0,0 +1,25 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Response; +import com.yahoo.messagebus.Reply; + +/** + * @author Simon Thoresen Hult + */ +public class MbusResponse extends Response { + + private final Reply reply; + + public MbusResponse(int status, Reply reply) { + super(status); + if (reply == null) { + throw new NullPointerException(); + } + this.reply = reply; + } + + public Reply getReply() { + return reply; + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java new file mode 100644 index 00000000000..e26e1e7e134 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java @@ -0,0 +1,135 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.Inject; +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.jdisc.service.ServerProvider; +import java.util.logging.Level; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.shared.ServerSession; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler { + + private final static Logger log = Logger.getLogger(MbusServer.class.getName()); + private final AtomicBoolean running = new AtomicBoolean(false); + private final CurrentContainer container; + private final ServerSession session; + private final URI uri; + private final ResourceReference sessionReference; + + @Inject + public MbusServer(CurrentContainer container, ServerSession session) { + this.container = container; + this.session = session; + uri = URI.create("mbus://localhost/" + session.name()); + session.setMessageHandler(this); + sessionReference = session.refer(); + } + + @Override + public void start() { + log.log(Level.FINE, "Starting message bus server."); + running.set(true); + } + + @Override + public void close() { + log.log(Level.FINE, "Closing message bus server."); + running.set(false); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying message bus server."); + running.set(false); + sessionReference.close(); + } + + @Override + public void handleMessage(Message msg) { + if (!running.get()) { + dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed."); + return; + } + if (msg.getTrace().shouldTrace(6)) { + msg.getTrace().trace(6, "Message received by MbusServer."); + } + Request request = null; + ContentChannel content = null; + try { + request = new MbusRequest(container, uri, msg); + content = request.connect(new ServerResponseHandler(msg)); + } catch (RuntimeException e) { + dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString()); + } finally { + if (request != null) { + request.release(); + } + } + if (content != null) { + content.close(IgnoredCompletionHandler.INSTANCE); + } + } + + public String connectionSpec() { + return session.connectionSpec(); + } + + private void dispatchErrorReply(Message msg, int errCode, String errMsg) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(errCode, errMsg)); + session.sendReply(reply); + } + + private class ServerResponseHandler implements ResponseHandler { + + final Message msg; + + ServerResponseHandler(Message msg) { + this.msg = msg; + } + + @Override + public ContentChannel handleResponse(Response response) { + Reply reply; + if (response instanceof MbusResponse) { + reply = ((MbusResponse)response).getReply(); + } else { + reply = new EmptyReply(); + reply.swapState(msg); + } + Error err = StatusCodes.toMbusError(response.getStatus()); + if (err != null) { + if (err.isFatal()) { + if (!reply.hasFatalErrors()) { + reply.addError(err); + } + } else { + if (!reply.hasErrors()) { + reply.addError(err); + } + } + } + if (reply.getTrace().shouldTrace(6)) { + reply.getTrace().trace(6, "Sending reply from MbusServer."); + } + session.sendReply(reply); + return null; + } + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java new file mode 100644 index 00000000000..6570c910af3 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java @@ -0,0 +1,77 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Response; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Reply; + +/** + * @author Simon Thoresen Hult + */ +public class StatusCodes { + + public static int fromMbusReply(final Reply reply) { + int statusCode = Response.Status.OK; + for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { + statusCode = Math.max(statusCode, fromMbusError(reply.getError(i))); + } + return statusCode; + } + + public static int fromMbusError(final Error error) { + final int errorCode = error.getCode(); + if (errorCode < ErrorCode.TRANSIENT_ERROR) { + return Response.Status.OK; + } + if (errorCode < ErrorCode.FATAL_ERROR) { + return Response.Status.TEMPORARY_REDIRECT; + } + switch (errorCode) { + case ErrorCode.SEND_QUEUE_CLOSED: + return Response.Status.LOCKED; + case ErrorCode.ILLEGAL_ROUTE: + return Response.Status.BAD_REQUEST; + case ErrorCode.NO_SERVICES_FOR_ROUTE: + return Response.Status.NOT_FOUND; + case ErrorCode.ENCODE_ERROR: + return Response.Status.BAD_REQUEST; + case ErrorCode.NETWORK_ERROR: + return Response.Status.BAD_REQUEST; // got nothing better + case ErrorCode.UNKNOWN_PROTOCOL: + return Response.Status.UNSUPPORTED_MEDIA_TYPE; + case ErrorCode.DECODE_ERROR: + return Response.Status.UNSUPPORTED_MEDIA_TYPE; + case ErrorCode.TIMEOUT: + return Response.Status.REQUEST_TIMEOUT; + case ErrorCode.INCOMPATIBLE_VERSION: + return Response.Status.VERSION_NOT_SUPPORTED; + case ErrorCode.UNKNOWN_POLICY: + return Response.Status.BAD_REQUEST; + case ErrorCode.NETWORK_SHUTDOWN: + return Response.Status.LOCKED; + case ErrorCode.POLICY_ERROR: + return Response.Status.PRECONDITION_FAILED; + case ErrorCode.SEQUENCE_ERROR: + return Response.Status.PRECONDITION_FAILED; + case ErrorCode.APP_FATAL_ERROR: + return Response.Status.INTERNAL_SERVER_ERROR; + default: + return Response.Status.INTERNAL_SERVER_ERROR; + } + } + + public static Error toMbusError(final int statusCode) { + if (statusCode < 300) { + return null; + } else if (statusCode < 400) { + return new Error(ErrorCode.APP_TRANSIENT_ERROR, statusCode + " Redirection"); + } else if (statusCode < 500) { + return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Client Error"); + } else if (statusCode < 600) { + return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Server Error"); + } else { + return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Unknown Error"); + } + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/package-info.java new file mode 100644 index 00000000000..9aea8cf7db8 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.jdisc; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java new file mode 100644 index 00000000000..111805d61b0 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java @@ -0,0 +1,134 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jdisc.References; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.MbusClient; +import com.yahoo.messagebus.jdisc.MbusRequest; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class ClientTestDriver { + + private final RemoteServer server; + private final MbusClient client; + private final SharedSourceSession session; + private final TestDriver driver; + + private ClientTestDriver(RemoteServer server, Protocol protocol) { + this.server = server; + + MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId()); + SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + session = mbus.newSourceSession(new SourceSessionParams()); + client = new MbusClient(session); + client.start(); + mbus.release(); + + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + ContainerBuilder builder = driver.newContainerBuilder(); + builder.clientBindings().bind("mbus://*/*", client); + driver.activateContainer(builder); + } + + public SourceSession sourceSession() { + return session.session(); + } + + public Request newServerRequest() { + return new Request(driver, URI.create("mbus://localhost/")); + } + + public Request newClientRequest(Message msg) { + msg.setRoute(Route.parse(server.connectionSpec())); + if (msg.getTrace().getLevel() == 0) { + msg.getTrace().setLevel(9); + } + final Request parent = newServerRequest(); + try (final ResourceReference ref = References.fromResource(parent)) { + return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg); + } + } + + public boolean sendRequest(Request request, ResponseHandler responseHandler) { + request.connect(responseHandler).close(null); + return true; + } + + public boolean sendMessage(Message msg, ResponseHandler responseHandler) { + final Request request = newClientRequest(msg); + try (final ResourceReference ref = References.fromResource(request)) { + return sendRequest(request, responseHandler); + } + } + + public Message awaitMessage() { + Message msg = null; + try { + msg = server.awaitMessage(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (msg != null) { + msg.getTrace().trace(0, "Message received by RemoteServer."); + } + return msg; + } + + public void sendReply(Reply reply) { + reply.getTrace().trace(0, "Sending reply from RemoteServer."); + server.sendReply(reply); + } + + public boolean awaitMessageAndSendReply(Reply reply) { + Message msg = awaitMessage(); + if (msg == null) { + return false; + } + reply.swapState(msg); + sendReply(reply); + return true; + } + + public boolean close() { + session.release(); + client.release(); + server.close(); + return driver.close(); + } + + public MbusClient client() { + return client; + } + + public RemoteServer server() { + return server; + } + + public static ClientTestDriver newInstance() { + return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol()); + } + + public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) { + return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol); + } + + public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) { + return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol()); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java new file mode 100644 index 00000000000..c5287165e27 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java @@ -0,0 +1,27 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class MessageQueue implements MessageHandler { + + private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); + + @Override + public void handleMessage(Message msg) { + queue.add(msg); + } + + public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java new file mode 100644 index 00000000000..57d0abd980b --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java @@ -0,0 +1,76 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class RemoteClient { + + private final Slobrok slobrok; + private final String slobrokId; + private final MessageBus mbus; + private final ReplyQueue queue = new ReplyQueue(); + private final SourceSession session; + + private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) { + this.slobrok = slobrok; + this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId; + mbus = network + ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)), + new MessageBusParams().addProtocol(protocol)) + : new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol)); + session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue)); + } + + public Result sendMessage(Message msg) { + return session.send(msg); + } + + public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException { + return queue.awaitReply(timeout, unit); + } + + public String slobrokId() { + return slobrokId; + } + + public void close() { + session.destroy(); + mbus.destroy(); + if (slobrok != null) { + slobrok.stop(); + } + } + + public static RemoteClient newInstanceWithInternSlobrok(boolean network) { + return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network); + } + + public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) { + return new RemoteClient(null, slobrokId, new SimpleProtocol(), network); + } + + public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) { + return new RemoteClient(newSlobrok(), null, protocol, network); + } + + private static Slobrok newSlobrok() { + Slobrok slobrok; + try { + slobrok = new Slobrok(); + } catch (ListenFailedException e) { + throw new IllegalStateException(e); + } + return slobrok; + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java new file mode 100644 index 00000000000..1f0f82c4903 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java @@ -0,0 +1,87 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class RemoteServer { + + private final Slobrok slobrok; + private final String slobrokId; + private final MessageBus mbus; + private final MessageQueue queue = new MessageQueue(); + private final DestinationSession session; + + private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) { + this.slobrok = slobrok; + this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId; + mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams() + .setSlobrokConfigId(this.slobrokId) + .setIdentity(new Identity(identity))), + new MessageBusParams().addProtocol(protocol)); + session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue)); + } + + public String connectionSpec() { + return session.getConnectionSpec(); + } + + public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException { + return queue.awaitMessage(timeout, unit); + } + + public void ackMessage(Message msg) { + session.acknowledge(msg); + } + + public void sendReply(Reply reply) { + session.reply(reply); + } + + public String slobrokId() { + return slobrokId; + } + + public void close() { + session.destroy(); + mbus.destroy(); + if (slobrok != null) { + slobrok.stop(); + } + } + + public static RemoteServer newInstanceWithInternSlobrok() { + return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote"); + } + + public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) { + return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote"); + } + + public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) { + return new RemoteServer(null, slobrokId, protocol, identity); + } + + public static RemoteServer newInstanceWithProtocol(Protocol protocol) { + return new RemoteServer(newSlobrok(), null, protocol, "remote"); + } + + private static Slobrok newSlobrok() { + try { + return new Slobrok(); + } catch (ListenFailedException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java new file mode 100644 index 00000000000..6c48aab5a7f --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java @@ -0,0 +1,26 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class ReplyQueue implements ReplyHandler { + + private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>(); + + @Override + public void handleReply(Reply reply) { + queue.add(reply); + } + + public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java new file mode 100644 index 00000000000..e59db28e886 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java @@ -0,0 +1,155 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.google.inject.Module; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.RequestHandler; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.jdisc.MbusServer; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.ServerSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class ServerTestDriver { + + private final RemoteClient client; + private final MbusServer server; + private final TestDriver driver; + + private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler, + Protocol protocol, Module... guiceModules) + { + this.client = client; + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules); + if (activateContainer) { + ContainerBuilder builder = driver.newContainerBuilder(); + if (requestHandler != null) { + builder.serverBindings().bind("mbus://*/*", requestHandler); + } + driver.activateContainer(builder); + } + + MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId()); + SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + ServerSession session = mbus.newDestinationSession(new DestinationSessionParams()); + server = new MbusServer(driver, session); + server.start(); + session.release(); + mbus.release(); + } + + public boolean sendMessage(Message msg) { + msg.setRoute(Route.parse(server.connectionSpec())); + msg.getTrace().setLevel(9); + return client.sendMessage(msg).isAccepted(); + } + + public Reply awaitReply() { + Reply reply = null; + try { + reply = client.awaitReply(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (reply != null) { + System.out.println(reply.getTrace()); + } + return reply; + } + + public Reply awaitSuccess() { + Reply reply = awaitReply(); + if (reply == null || reply.hasErrors()) { + return null; + } + return reply; + } + + public Reply awaitErrors(Integer... errCodes) { + Reply reply = awaitReply(); + if (reply == null) { + return null; + } + List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes)); + for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { + Error err = reply.getError(i); + System.out.println(err); + int idx = lst.indexOf(err.getCode()); + if (idx < 0) { + return null; + } + lst.remove(idx); + } + if (!lst.isEmpty()) { + return null; + } + return reply; + } + + public boolean close() { + server.close(); + server.release(); + client.close(); + return driver.close(); + } + + public TestDriver parent() { + return driver; + } + + public RemoteClient client() { + return client; + } + + public MbusServer server() { + return server; + } + + public static ServerTestDriver newInstance(RequestHandler requestHandler, boolean network, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, + new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler, + boolean network, Module... guiceModules) + { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, protocol, + guiceModules); + } + + public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler, + boolean network, Module... guiceModules) + { + return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network), + true, requestHandler, new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null, + new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, boolean network, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol, network), false, null, + protocol, guiceModules); + } + + public static ServerTestDriver newUnboundInstance(boolean network, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, null, + new SimpleProtocol(), guiceModules); + } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/network/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/network/package-info.java new file mode 100644 index 00000000000..72f563e8bd7 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/network/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.network; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java new file mode 100644 index 00000000000..7b468813713 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.network.rpc; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/package-info.java new file mode 100644 index 00000000000..63b713e70e0 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/routing/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/routing/package-info.java new file mode 100644 index 00000000000..ba8fc5fafba --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/routing/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.routing; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java new file mode 100644 index 00000000000..0964a254cf2 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java @@ -0,0 +1,14 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.SharedResource; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Result; + +/** + * @author Simon Thoresen Hult + */ +public interface ClientSession extends SharedResource { + + public Result sendMessage(Message msg); +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java new file mode 100644 index 00000000000..ad58d6b9a5e --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java @@ -0,0 +1,73 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.NetworkOwner; +import com.yahoo.messagebus.routing.RoutingNode; + +import java.util.List; + +/** + * <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p> + * + * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a> + */ +class NullNetwork implements Network { + + @Override + public boolean waitUntilReady(double seconds) { + return true; + } + + @Override + public void attach(NetworkOwner owner) { + + } + + @Override + public void registerSession(String session) { + + } + + @Override + public void unregisterSession(String session) { + + } + + @Override + public boolean allocServiceAddress(RoutingNode recipient) { + return false; + } + + @Override + public void freeServiceAddress(RoutingNode recipient) { + + } + + @Override + public void send(Message msg, List<RoutingNode> recipients) { + + } + + @Override + public void sync() { + + } + + @Override + public void shutdown() { + + } + + @Override + public String getConnectionSpec() { + return null; + } + + @Override + public IMirror getMirror() { + return null; + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java new file mode 100644 index 00000000000..56713815c7a --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java @@ -0,0 +1,22 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.SharedResource; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; + +/** + * @author Simon Thoresen Hult + */ +public interface ServerSession extends SharedResource { + + public MessageHandler getMessageHandler(); + + public void setMessageHandler(MessageHandler msgHandler); + + public void sendReply(Reply reply); + + public String connectionSpec(); + + public String name(); +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java new file mode 100644 index 00000000000..7da164757cd --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java @@ -0,0 +1,85 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import java.util.logging.Level; +import com.yahoo.messagebus.DestinationSession; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedDestinationSession extends AbstractResource implements MessageHandler, ServerSession { + + private static final Logger log = Logger.getLogger(SharedDestinationSession.class.getName()); + private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); + private final DestinationSession session; + private final ResourceReference mbusReference; + + SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) { + this.msgHandler.set(params.getMessageHandler()); + this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this)); + this.mbusReference = mbus.refer(); + } + + public DestinationSession session() { + return session; + } + + @Override + public void sendReply(Reply reply) { + session.reply(reply); + } + + @Override + public MessageHandler getMessageHandler() { + return msgHandler.get(); + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + if (!this.msgHandler.compareAndSet(null, msgHandler)) { + throw new IllegalStateException("Message handler already registered."); + } + } + + @Override + public void handleMessage(Message msg) { + MessageHandler msgHandler = this.msgHandler.get(); + if (msgHandler == null) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); + sendReply(reply); + return; + } + msgHandler.handleMessage(msg); + } + + @Override + public String connectionSpec() { + return session.getConnectionSpec(); + } + + @Override + public String name() { + return session.getName(); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared destination session."); + session.destroy(); + mbusReference.close(); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java new file mode 100644 index 00000000000..5c9fab46e34 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java @@ -0,0 +1,104 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import java.util.logging.Level; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.IntermediateSession; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedIntermediateSession extends AbstractResource + implements ClientSession, ServerSession, MessageHandler, ReplyHandler +{ + + private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName()); + private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); + private final IntermediateSession session; + private final ResourceReference mbusReference; + + public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.msgHandler.set(params.getMessageHandler()); + this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this) + .setMessageHandler(this)); + this.mbusReference = mbus.refer(); + } + + public IntermediateSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + session.forward(msg); + return Result.ACCEPTED; + } + + @Override + public void sendReply(Reply reply) { + session.forward(reply); + } + + @Override + public MessageHandler getMessageHandler() { + return msgHandler.get(); + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + if (!this.msgHandler.compareAndSet(null, msgHandler)) { + throw new IllegalStateException("Message handler already registered."); + } + } + + @Override + public void handleMessage(Message msg) { + MessageHandler msgHandler = this.msgHandler.get(); + if (msgHandler == null) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); + sendReply(reply); + return; + } + msgHandler.handleMessage(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + public String connectionSpec() { + return session.getConnectionSpec(); + } + + @Override + public String name() { + return session.getName(); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared intermediate session."); + session.destroy(); + mbusReference.close(); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java new file mode 100644 index 00000000000..dd135a51378 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java @@ -0,0 +1,68 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.config.subscription.ConfigGetter; +import com.yahoo.jdisc.AbstractResource; +import java.util.logging.Level; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.cloud.config.SlobroksConfig; + +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedMessageBus extends AbstractResource { + + private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName()); + private final MessageBus mbus; + + public SharedMessageBus(MessageBus mbus) { + mbus.getClass(); // throws NullPointerException + this.mbus = mbus; + } + + public MessageBus messageBus() { + return mbus; + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared message bus."); + mbus.destroy(); + } + + public SharedSourceSession newSourceSession(SourceSessionParams params) { + return new SharedSourceSession(this, params); + } + + public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) { + return new SharedIntermediateSession(this, params); + } + + public SharedDestinationSession newDestinationSession(DestinationSessionParams params) { + return new SharedDestinationSession(this, params); + } + + public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) { + return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams)); + } + + private static Network newNetwork(RPCNetworkParams params) { + SlobroksConfig cfg = params.getSlobroksConfig(); + if (cfg == null) { + cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId()); + } + if (cfg.slobrok().isEmpty()) { + return new NullNetwork(); // for LocalApplication + } + return new RPCNetwork(params); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java new file mode 100644 index 00000000000..56071682349 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java @@ -0,0 +1,58 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import java.util.logging.Level; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; + +import java.util.logging.Logger; + +/** + * @author Simon Thoresen Hult + */ +public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler { + + private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName()); + private final SourceSession session; + private final ResourceReference mbusReference; + + public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this)); + this.mbusReference = mbus.refer(); + } + + public SourceSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + return session.send(msg); + } + + public Result sendMessageBlocking(Message msg) throws InterruptedException { + return session.sendBlocking(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + protected void destroy() { + log.log(Level.FINE, "Destroying shared source session."); + session.close(); + mbusReference.close(); + } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java new file mode 100644 index 00000000000..941a0dc4c5c --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java @@ -0,0 +1,8 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Not a public API, exported for use in internal components. + */ +@ExportPackage +package com.yahoo.messagebus.shared; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/test/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/test/package-info.java new file mode 100644 index 00000000000..42bc03b6e17 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/test/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.test; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java new file mode 100644 index 00000000000..62a9a864781 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java @@ -0,0 +1,149 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.FutureResponse; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.DestinationSession; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author Simon Thoresen Hult + */ +public class ClientThreadingTestCase { + + private static final int NUM_THREADS = 32; + private static final int NUM_REQUESTS = 1000; + + @Test + @Ignore + public void requireThatClientIsThreadSafe() throws Exception { + final LocalWire wire = new LocalWire(); + final Client client = new Client(wire); + final Server server = new Server(wire); + + final List<Callable<Boolean>> lst = new LinkedList<>(); + final Route route = Route.parse(server.session.getConnectionSpec()); + for (int i = 0; i < NUM_THREADS; ++i) { + lst.add(new RequestTask(client, route)); + } + final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); + for (final Future<Boolean> res : executor.invokeAll(lst, 60, TimeUnit.SECONDS)) { + assertThat(res.get(), is(true)); + } + + assertThat(client.close(), is(true)); + assertThat(server.close(), is(true)); + } + + private static final class RequestTask implements Callable<Boolean> { + + final Client client; + final Route route; + + RequestTask(final Client client, final Route route) { + this.client = client; + this.route = route; + } + + @Override + public Boolean call() throws Exception { + for (int i = 0; i < NUM_REQUESTS; ++i) { + final FutureResponse responseHandler = new FutureResponse(); + client.send(new SimpleMessage("foo").setRoute(route), responseHandler); + responseHandler.get(60, TimeUnit.SECONDS); + } + return true; + } + } + + private static class Client { + + final MbusClient delegate; + final TestDriver driver; + + Client(final LocalWire wire) { + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + delegate = newMbusClient(wire); + + final ContainerBuilder builder = driver.newContainerBuilder(); + builder.clientBindings().bind("mbus://*/*", delegate); + driver.activateContainer(builder); + delegate.start(); + } + + void send(final Message msg, final ResponseHandler handler) { + final MbusRequest request = new MbusRequest(driver, URI.create("mbus://remote/"), msg); + request.setServerRequest(false); + request.connect(handler).close(null); + request.release(); + } + + boolean close() { + delegate.release(); + return driver.close(); + } + } + + private static class Server implements MessageHandler { + + final MessageBus mbus; + final DestinationSession session; + + Server(final LocalWire wire) { + mbus = new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol())); + session = mbus.createDestinationSession( + new DestinationSessionParams().setMessageHandler(this)); + } + + @Override + public void handleMessage(final Message msg) { + session.acknowledge(msg); + } + + boolean close() { + return session.destroy() && mbus.destroy(); + } + } + + private static MbusClient newMbusClient(final LocalWire wire) { + final SharedMessageBus mbus = new SharedMessageBus(new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol()))); + final SharedSourceSession session = mbus.newSourceSession( + new SourceSessionParams()); + final MbusClient client = new MbusClient(session); + session.release(); + mbus.release(); + return client; + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java new file mode 100644 index 00000000000..9cfd1fd02b9 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java @@ -0,0 +1,345 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDeniedException; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.test.ClientTestDriver; +import com.yahoo.messagebus.shared.ClientSession; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author Simon Thoresen Hult + */ +public class MbusClientTestCase { + + @Test + public void requireThatClientRetainsSession() { + MySession session = new MySession(); + assertEquals(1, session.refCount); + MbusClient client = new MbusClient(session); + assertEquals(2, session.refCount); + session.release(); + assertEquals(1, session.refCount); + client.destroy(); + assertEquals(0, session.refCount); + } + + @Test + public void requireThatRequestResponseWorks() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertEquals(Response.Status.OK, response.getStatus()); + assertTrue(driver.close()); + } + + @Test + public void requireThatNonMbusRequestIsDenied() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + Request serverReq = null; + Request clientReq = null; + try { + serverReq = driver.newServerRequest(); + clientReq = new Request(serverReq, URI.create("mbus://host/path")); + clientReq.connect(MyResponseHandler.newInstance()); + fail(); + } catch (RequestDeniedException e) { + System.out.println(e.getMessage()); + } finally { + if (serverReq != null) { + serverReq.release(); + } + if (clientReq != null) { + clientReq.release(); + } + } + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestContentDoesNotSupportWrite() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Request request = null; + ContentChannel content; + try { + request = driver.newClientRequest(new SimpleMessage("foo")); + content = request.connect(responseHandler); + } finally { + if (request != null) { + request.release(); + } + } + try { + content.write(ByteBuffer.allocate(69), null); + fail(); + } catch (UnsupportedOperationException e) { + + } + content.close(null); + + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + assertNotNull(responseHandler.awaitResponse()); + assertTrue(driver.close()); + } + + @Test + public void requireThatResponseIsMbus() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertTrue(response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof EmptyReply); + assertTrue(driver.close()); + } + + @Test + public void requireThatServerReceivesGivenMessage() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + + Message msg = driver.awaitMessage(); + assertTrue(msg instanceof SimpleMessage); + assertEquals("foo", ((SimpleMessage)msg).getValue()); + + Reply reply = new EmptyReply(); + reply.swapState(msg); + driver.sendReply(reply); + + assertNotNull(responseHandler.awaitResponse()); + assertTrue(driver.close()); + } + + @Test + public void requireThatClientReceivesGivenReply() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + + Message msg = driver.awaitMessage(); // TODO: Timing sensitive + assertNotNull(msg); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + driver.sendReply(reply); + + Response response = responseHandler.awaitResponse(); + assertTrue(response instanceof MbusResponse); + reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof SimpleReply); + assertEquals("bar", ((SimpleReply)reply).getValue()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToResponse() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Message msg = new SimpleMessage("foo"); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + ReplyHandler pushedHandler = new MyReplyHandler(); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + assertTrue(driver.sendMessage(msg, responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToSyncMbusSendFailureResponse() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + driver.sourceSession().close(); + + Message msg = new SimpleMessage("foo"); + ReplyHandler pushedHandler = new MyReplyHandler(); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + driver.sendMessage(msg, responseHandler); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToTimeoutResponse() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Message msg = new SimpleMessage("foo"); + ReplyHandler pushedHandler = new MyReplyHandler(); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + + Request request = driver.newClientRequest(msg); + request.setTimeout(1, TimeUnit.MILLISECONDS); + assertTrue(driver.sendRequest(request, responseHandler)); + request.release(); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatSyncMbusSendFailureRespondsWithError() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + driver.sourceSession().close(); + + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + driver.sendMessage(new SimpleMessage("foo"), responseHandler); + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.SEND_QUEUE_CLOSED, reply.getError(0).getCode()); + assertTrue(driver.close()); + } + + private static class MyResponseHandler implements ResponseHandler { + + final MyResponseContent content; + Response response; + + MyResponseHandler(MyResponseContent content) { + this.content = content; + } + + Response awaitResponse() { + try { + content.closeLatch.await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (response instanceof MbusResponse) { + //System.out.println(((MbusResponse)response).getReply().getTrace()); + } + return response; + } + + @Override + public ContentChannel handleResponse(Response response) { + this.response = response; + return content; + } + + static MyResponseHandler newInstance() { + return new MyResponseHandler(new MyResponseContent()); + } + } + + private static class MyResponseContent implements ContentChannel { + + final CountDownLatch writeLatch = new CountDownLatch(1); + final CountDownLatch closeLatch = new CountDownLatch(1); + + @Override + public void write(ByteBuffer buf, CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + writeLatch.countDown(); + } + + @Override + public void close(CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + closeLatch.countDown(); + } + } + + private static class MySession implements ClientSession { + + int refCount = 1; + + @Override + public Result sendMessage(Message msg) { + return null; + } + + @Override + public ResourceReference refer() { + ++refCount; + return new ResourceReference() { + @Override + public void close() { + --refCount; + } + }; + } + + @Override + public void release() { + --refCount; + } + } + + private static class MyReplyHandler implements ReplyHandler { + + @Override + public void handleReply(Reply reply) { + + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java new file mode 100644 index 00000000000..316ad18bae9 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java @@ -0,0 +1,121 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.RequestDispatch; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.test.SimpleMessage; +import org.junit.Test; + +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author Simon Thoresen Hult + */ +public class MbusRequestHandlerTestCase { + + @Test + public void requireThatNonMbusRequestThrows() throws Exception { + final TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE); + try { + new RequestDispatch() { + + @Override + protected Request newRequest() { + return new Request(driver, URI.create("mbus://localhost/")); + } + }.connect(); + fail(); + } catch (UnsupportedOperationException e) { + assertEquals("Expected MbusRequest, got com.yahoo.jdisc.Request.", e.getMessage()); + } + assertTrue(driver.close()); + } + + @Test + public void requireThatHandlerCanRespondInSameThread() throws Exception { + TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE); + + Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS); + assertTrue(response instanceof MbusResponse); + assertEquals(Response.Status.OK, response.getStatus()); + Reply reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof EmptyReply); + assertFalse(reply.hasErrors()); + + assertTrue(driver.close()); + } + + @Test + public void requireThatHandlerCanRespondInOtherThread() throws Exception { + TestDriver driver = newTestDriver(ThreadedReplier.INSTANCE); + + Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS); + assertTrue(response instanceof MbusResponse); + assertEquals(Response.Status.OK, response.getStatus()); + Reply reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof EmptyReply); + assertFalse(reply.hasErrors()); + + assertTrue(driver.close()); + } + + private static TestDriver newTestDriver(MbusRequestHandler handler) { + TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + ContainerBuilder builder = driver.newContainerBuilder(); + builder.serverBindings().bind("mbus://*/*", handler); + driver.activateContainer(builder); + return driver; + } + + private static ListenableFuture<Response> dispatchMessage(final TestDriver driver, final Message msg) { + return new RequestDispatch() { + + @Override + protected Request newRequest() { + return new MbusRequest(driver, URI.create("mbus://localhost/"), msg); + } + }.dispatch(); + } + + private static class SameThreadReplier extends MbusRequestHandler { + + final static SameThreadReplier INSTANCE = new SameThreadReplier(); + + @Override + public void handleMessage(Message msg) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + } + + private static class ThreadedReplier extends MbusRequestHandler { + + final static ThreadedReplier INSTANCE = new ThreadedReplier(); + + @Override + public void handleMessage(final Message msg) { + Executors.newSingleThreadExecutor().execute(new Runnable() { + + @Override + public void run() { + SameThreadReplier.INSTANCE.handleMessage(msg); + } + }); + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java new file mode 100644 index 00000000000..c68ab4e6742 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java @@ -0,0 +1,73 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.text.Utf8String; +import org.junit.Test; + +import java.net.URI; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author Simon Thoresen Hult + */ +public class MbusRequestTestCase { + + @Test + public void requireThatAccessorsWork() { + TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + driver.activateContainer(driver.newContainerBuilder()); + + MyMessage msg = new MyMessage(); + MbusRequest request = new MbusRequest(driver, URI.create("mbus://host/path"), msg); + assertSame(msg, request.getMessage()); + request.release(); + driver.close(); + } + + @Test + public void requireThatMessageCanNotBeNullInRootRequest() { + TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + driver.activateContainer(driver.newContainerBuilder()); + try { + new MbusRequest(driver, URI.create("mbus://host/path"), null); + fail(); + } catch (NullPointerException e) { + // expected + } + assertTrue(driver.close()); + } + + @Test + public void requireThatMessageCanNotBeNullInChildRequest() { + TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + driver.activateContainer(driver.newContainerBuilder()); + MbusRequest parent = new MbusRequest(driver, URI.create("mbus://host/path"), new SimpleMessage("foo")); + try { + new MbusRequest(parent, URI.create("mbus://host/path"), null); + fail(); + } catch (NullPointerException e) { + // expected + } + parent.release(); + assertTrue(driver.close()); + } + + private class MyMessage extends Message { + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java new file mode 100644 index 00000000000..eb4cb949770 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java @@ -0,0 +1,46 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Response; +import com.yahoo.messagebus.Reply; +import com.yahoo.text.Utf8String; +import org.junit.Test; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; + +/** + * @author Simon Thoresen Hult + */ +public class MbusResponseTestCase { + + @Test + public void requireThatAccessorsWork() { + MyReply reply = new MyReply(); + MbusResponse response = new MbusResponse(Response.Status.OK, reply); + assertSame(reply, response.getReply()); + } + + @Test + public void requireThatReplyCanNotBeNull() { + try { + new MbusResponse(Response.Status.OK, null); + fail(); + } catch (NullPointerException e) { + + } + } + + private class MyReply extends Reply { + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java new file mode 100644 index 00000000000..bf89f3869ed --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java @@ -0,0 +1,694 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import com.yahoo.jdisc.test.ServerProviderConformanceTest; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.ServerSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.hamcrest.Matcher; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static com.yahoo.messagebus.ErrorCode.APP_FATAL_ERROR; +import static com.yahoo.messagebus.ErrorCode.SESSION_BUSY; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * @author Simon Thoresen Hult + */ +public class MbusServerConformanceTest extends ServerProviderConformanceTest { + + /* Many of the "success" expectations here (may) seem odd. But this is the current behavior of the + * messagebus server. We should probably look into whether the behavior is correct in all cases. + */ + + @Override + @Test + public void testContainerNotReadyException() throws Throwable { + new TestRunner().setRequestTimeout(100, TimeUnit.MILLISECONDS) + .expectError(is(SESSION_BUSY)) + .executeAndClose(); + } + + @Override + @Test + public void testBindingSetNotFoundException() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testNoBindingSetSelectedException() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testBindingNotFoundException() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testRequestHandlerWithSyncCloseResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestHandlerWithSyncWriteResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestHandlerWithSyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestHandlerWithAsyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestException() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionWithSyncCloseResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionWithSyncWriteResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestNondeterministicExceptionWithSyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestExceptionAfterResponseWriteWithSyncHandleResponse() throws Throwable { + } + + @Override + @Test + public void testRequestNondeterministicExceptionWithAsyncHandleResponse() throws Throwable { + new TestRunner().executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithNondeterministicSyncFailure() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithSyncFailureBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithSyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithNondeterministicAsyncFailure() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithAsyncFailureBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithAsyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteNondeterministicException() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseCloseNoContent() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteNondeterministicExceptionWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithNondeterministicSyncFailure() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithNondeterministicAsyncFailure() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseWithSyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithAsyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithNondeterministicSyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithSyncFailureBeforeResponseWrite() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseWithSyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithNondeterministicAsyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithAsyncFailureBeforeResponseWrite() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseWithAsyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicException() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWrite() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWrite() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContent() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicExceptionWithSyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicExceptionWithSyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicExceptionWithAsyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testResponseWriteCompletionException() throws Throwable { + } + + @Override + @Test + public void testResponseCloseCompletionException() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testResponseCloseCompletionExceptionNoContent() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + private class TestRunner implements Adapter<MbusServer, MyClient, Reply> { + + final LocalWire wire = new LocalWire(); + final SharedMessageBus mbus; + final ServerSession session; + Matcher<Integer> expectedError = null; + boolean successExpected = false; + long timeoutMillis = TimeUnit.SECONDS.toMillis(60); + + TestRunner() { + this(new MessageBusParams().addProtocol(new SimpleProtocol()), + new DestinationSessionParams()); + } + + TestRunner(MessageBusParams mbusParams, DestinationSessionParams sessionParams) { + this.mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(wire), mbusParams)); + this.session = mbus.newDestinationSession(sessionParams); + } + + TestRunner setRequestTimeout(long timeout, TimeUnit unit) { + timeoutMillis = unit.toMillis(timeout); + return this; + } + + TestRunner expectError(Matcher<Integer> matcher) { + assertThat(successExpected, is(false)); + expectedError = matcher; + return this; + } + + TestRunner expectSuccess() { + assertThat(expectedError, is(nullValue())); + successExpected = true; + return this; + } + + @Override + public Module newConfigModule() { + return new AbstractModule() { + + @Override + protected void configure() { + bind(ServerSession.class).toInstance(session); + } + }; + } + + @Override + public Class<MbusServer> getServerProviderClass() { + return MbusServer.class; + } + + @Override + public MyClient newClient(MbusServer server) throws Throwable { + return new MyClient(wire, server.connectionSpec()); + } + + @Override + public Reply executeRequest(MyClient client, boolean withRequestContent) throws Throwable { + // This protocol doesn't have the concept of "request content", so if we are asked to send any, it's a bug. + assertThat(withRequestContent, is(false)); + + final SimpleMessage msg = new SimpleMessage("foo"); + msg.getTrace().setLevel(9); + msg.setRoute(client.route); + msg.setTimeRemaining(timeoutMillis); + assertThat("client.session.send(msg).isAccepted()", + client.session.send(msg).isAccepted(), is(true)); + + final Reply reply = client.replies.poll(60, TimeUnit.SECONDS); + assertThat("reply != null", reply, notNullValue()); + return reply; + } + + @Override + public Iterable<ByteBuffer> newResponseContent() { + return Collections.emptyList(); + } + + @Override + public void validateResponse(Reply reply) throws Throwable { + final String trace = String.valueOf(reply.getTrace()); + if (expectedError != null) { + assertThat(reply.hasErrors(), is(true)); + final int error = reply.getError(0).getCode(); + assertThat(trace, error, expectedError); + } + if (successExpected) { + assertThat(trace, reply.hasErrors(), is(false)); + } + } + + void executeAndClose() throws Throwable { + runTest(this); + session.release(); + mbus.release(); + } + } + + public static class MyClient implements Closeable, ReplyHandler { + + final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); + final MessageBus mbus; + final Route route; + final SourceSession session; + + MyClient(LocalWire wire, String connectionSpec) { + this.mbus = new MessageBus(new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol())); + this.session = mbus.createSourceSession(new SourceSessionParams().setReplyHandler(this)); + this.route = Route.parse(connectionSpec); + } + + @Override + public void close() throws IOException { + session.destroy(); + mbus.destroy(); + } + + @Override + public void handleReply(Reply reply) { + replies.addLast(reply); + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java new file mode 100644 index 00000000000..9d45d2e7abf --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java @@ -0,0 +1,374 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.AbstractModule; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.application.BindingSetSelector; +import com.yahoo.jdisc.handler.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.test.ServerTestDriver; +import com.yahoo.messagebus.shared.ServerSession; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author Simon Thoresen Hult + */ +public class MbusServerTestCase { + + @Test + public void requireThatServerRetainsSession() { + MySession session = new MySession(); + assertEquals(1, session.refCount); + MbusServer server = new MbusServer(null, session); + assertEquals(2, session.refCount); + session.release(); + assertEquals(1, session.refCount); + server.destroy(); + assertEquals(0, session.refCount); + } + + @Test + public void requireThatNoBindingSetSelectedExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector(null)); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatBindingSetNotFoundExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector("foo")); + assertTrue(driver.sendMessage(new SimpleMessage("bar"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatContainerNotReadyExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newInactiveInstance(true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatBindingNotFoundExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestDeniedExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newInstance(MyRequestHandler.newRequestDenied(), true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestResponseWorks() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestIsMbus() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + Request request = requestHandler.awaitRequest(); + assertTrue(request instanceof MbusRequest); + Message msg = ((MbusRequest)request).getMessage(); + assertTrue(msg instanceof SimpleMessage); + assertEquals("foo", ((SimpleMessage)msg).getValue()); + assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatReplyInsideMbusResponseIsUsed() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + Reply reply = new SimpleReply("bar"); + reply.swapState(((MbusRequest)requestHandler.request).getMessage()); + assertTrue(requestHandler.sendResponse(new MbusResponse(Response.Status.OK, reply))); + + reply = driver.awaitSuccess(); + assertTrue(reply instanceof SimpleReply); + assertEquals("bar", ((SimpleReply)reply).getValue()); + + assertTrue(driver.close()); + } + + @Test + public void requireThatNonMbusResponseCausesEmptyReply() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatMbusRequestContentCallsCompletion() throws InterruptedException { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK)); + assertNotNull(content); + MyCompletion completion = new MyCompletion(); + content.close(completion); + assertTrue(completion.completedLatch.await(60, TimeUnit.SECONDS)); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatResponseContentDoesNotSupportWrite() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK)); + assertNotNull(content); + try { + content.write(ByteBuffer.allocate(69), null); + fail(); + } catch (UnsupportedOperationException e) { + + } + content.close(null); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatResponseErrorCodeDoesNotDuplicateReplyError() { + assertError(Collections.<Integer>emptyList(), + Response.Status.OK); + assertError(Arrays.asList(ErrorCode.APP_FATAL_ERROR), + Response.Status.BAD_REQUEST); + assertError(Arrays.asList(ErrorCode.FATAL_ERROR), + Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR); + assertError(Arrays.asList(ErrorCode.TRANSIENT_ERROR, ErrorCode.APP_FATAL_ERROR), + Response.Status.BAD_REQUEST, ErrorCode.TRANSIENT_ERROR); + assertError(Arrays.asList(ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR), + Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR); + } + + private static void assertError(List<Integer> expectedErrors, int responseStatus, int... responseErrors) { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + Reply reply = new SimpleReply("bar"); + reply.swapState(((MbusRequest)requestHandler.request).getMessage()); + for (int err : responseErrors) { + reply.addError(new Error(err, "err")); + } + assertTrue(requestHandler.sendResponse(new MbusResponse(responseStatus, reply))); + + assertNotNull(reply = driver.awaitReply()); + List<Integer> actual = new LinkedList<>(); + for (int i = 0; i < reply.getNumErrors(); ++i) { + actual.add(reply.getError(i).getCode()); + } + assertEquals(expectedErrors, actual); + assertTrue(driver.close()); + } + + private static class MySelector extends AbstractModule implements BindingSetSelector { + + final String bindingSet; + + MySelector(String bindingSet) { + this.bindingSet = bindingSet; + } + + @Override + protected void configure() { + bind(BindingSetSelector.class).toInstance(this); + } + + @Override + public String select(URI uri) { + return bindingSet; + } + } + + private static class MyRequestHandler extends AbstractRequestHandler { + + final MyRequestContent content; + Request request; + ResponseHandler responseHandler; + + MyRequestHandler(MyRequestContent content) { + this.content = content; + } + + @Override + public ContentChannel handleRequest(Request request, ResponseHandler responseHandler) { + this.request = request; + this.responseHandler = responseHandler; + if (content == null) { + throw new RequestDeniedException(request); + } + return content; + } + + Request awaitRequest() { + try { + if (!content.closeLatch.await(60, TimeUnit.SECONDS)) { + return null; + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (request instanceof MbusRequest) { + ((MbusRequest)request).getMessage().getTrace().trace(0, "Request received by DISC."); + } + return request; + } + + boolean sendResponse(Response response) { + ContentChannel content = responseHandler.handleResponse(response); + if (content == null) { + return false; + } + content.close(null); + return true; + } + + static MyRequestHandler newInstance() { + return new MyRequestHandler(new MyRequestContent()); + } + + static MyRequestHandler newRequestDenied() { + return new MyRequestHandler(null); + } + } + + private static class MyRequestContent implements ContentChannel { + + final CountDownLatch writeLatch = new CountDownLatch(1); + final CountDownLatch closeLatch = new CountDownLatch(1); + + @Override + public void write(ByteBuffer buf, CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + writeLatch.countDown(); + } + + @Override + public void close(CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + closeLatch.countDown(); + } + } + + private static class MyCompletion implements CompletionHandler { + + final CountDownLatch completedLatch = new CountDownLatch(1); + + @Override + public void completed() { + completedLatch.countDown(); + } + + @Override + public void failed(Throwable t) { + + } + } + + private static class MySession implements ServerSession { + + int refCount = 1; + + @Override + public void sendReply(Reply reply) { + + } + + @Override + public MessageHandler getMessageHandler() { + return null; + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + + } + + @Override + public String connectionSpec() { + return null; + } + + @Override + public String name() { + return null; + } + + @Override + public ResourceReference refer() { + ++refCount; + return new ResourceReference() { + @Override + public void close() { + --refCount; + } + }; + } + + @Override + public void release() { + --refCount; + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java new file mode 100644 index 00000000000..a7ee355094f --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java @@ -0,0 +1,137 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedDestinationSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * @author Simon Thoresen Hult + */ +public class ServerThreadingTestCase { + + private static final int NUM_THREADS = 32; + private static final int NUM_REQUESTS = 1000; + + @Test + public void requireThatServerIsThreadSafe() throws Exception { + final LocalWire wire = new LocalWire(); + final Client client = new Client(wire); + final Server server = new Server(wire); + + for (int i = 0; i < NUM_REQUESTS; ++i) { + final Message msg = new SimpleMessage("foo"); + msg.setRoute(Route.parse(server.delegate.connectionSpec())); + msg.pushHandler(client); + assertThat(client.session.send(msg).isAccepted(), is(true)); + } + for (int i = 0; i < NUM_REQUESTS; ++i) { + final Reply reply = client.replies.poll(600, TimeUnit.SECONDS); + assertThat(reply, instanceOf(EmptyReply.class)); + assertThat(reply.hasErrors(), is(false)); + } + + assertThat(client.close(), is(true)); + assertThat(server.close(), is(true)); + } + + private static class Client implements ReplyHandler { + + final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); + final MessageBus mbus; + final SourceSession session; + + Client(final LocalWire wire) { + mbus = new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol())); + session = mbus.createSourceSession( + new SourceSessionParams() + .setReplyHandler(this) + .setThrottlePolicy(null)); + } + + @Override + public void handleReply(final Reply reply) { + replies.addLast(reply); + } + + boolean close() { + return session.destroy() && mbus.destroy(); + } + } + + private static class Server extends MbusRequestHandler { + + final Executor executor = Executors.newFixedThreadPool(NUM_THREADS); + final MbusServer delegate; + final TestDriver driver; + + Server(final LocalWire wire) { + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + delegate = newMbusServer(driver, wire); + + final ContainerBuilder builder = driver.newContainerBuilder(); + builder.serverBindings().bind("mbus://*/*", this); + driver.activateContainer(builder); + delegate.start(); + } + + @Override + public void handleMessage(final Message msg) { + executor.execute(new Runnable() { + + @Override + public void run() { + final Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }); + } + + boolean close() { + delegate.release(); + return driver.close(); + } + } + + private static MbusServer newMbusServer(final CurrentContainer container, final LocalWire wire) { + final SharedMessageBus mbus = new SharedMessageBus(new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol()))); + final SharedDestinationSession session = mbus.newDestinationSession( + new DestinationSessionParams()); + final MbusServer server = new MbusServer(container, session); + session.release(); + mbus.release(); + return server; + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java new file mode 100644 index 00000000000..ef290a070cb --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java @@ -0,0 +1,32 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * @author Simon Thoresen Hult + */ +public class ClientTestDriverTestCase { + + @Test + public void requireThatFactoryMethodsWork() throws ListenFailedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + assertNotNull(driver); + assertTrue(driver.close()); + + driver = ClientTestDriver.newInstanceWithProtocol(new SimpleProtocol()); + assertNotNull(driver); + assertTrue(driver.close()); + + Slobrok slobrok = new Slobrok(); + driver = ClientTestDriver.newInstanceWithExternSlobrok(slobrok.configId()); + assertNotNull(driver); + assertTrue(driver.close()); + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java new file mode 100644 index 00000000000..f6ae2335d12 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java @@ -0,0 +1,34 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jdisc.test.NonWorkingRequestHandler; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * @author Simon Thoresen Hult + */ +public class ServerTestDriverTestCase { + + @Test + public void requireThatFactoryMethodsWork() throws ListenFailedException { + ServerTestDriver driver = ServerTestDriver.newInstance(new NonWorkingRequestHandler(), false); + assertNotNull(driver); + assertTrue(driver.close()); + + driver = ServerTestDriver.newInstanceWithProtocol(new SimpleProtocol(), new NonWorkingRequestHandler(), false); + assertNotNull(driver); + assertTrue(driver.close()); + + Slobrok slobrok = new Slobrok(); + driver = ServerTestDriver.newInstanceWithExternSlobrok(slobrok.configId(), new NonWorkingRequestHandler(), false); + assertNotNull(driver); + assertTrue(driver.close()); + } + +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java new file mode 100644 index 00000000000..78e79da4b9f --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java @@ -0,0 +1,134 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.test.MessageQueue; +import com.yahoo.messagebus.jdisc.test.RemoteClient; +import com.yahoo.messagebus.jdisc.test.ReplyQueue; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author Simon Thoresen Hult + */ +public class SharedDestinationSessionTestCase { + + @Test + public void requireThatMessageHandlerCanBeAccessed() { + SharedDestinationSession session = newDestinationSession(); + assertNull(session.getMessageHandler()); + + MessageQueue handler = new MessageQueue(); + session.setMessageHandler(handler); + assertSame(handler, session.getMessageHandler()); + } + + @Test + public void requireThatMessageHandlerCanOnlyBeSetOnce() { + SharedDestinationSession session = newDestinationSession(); + session.setMessageHandler(new MessageQueue()); + try { + session.setMessageHandler(new MessageQueue()); + fail(); + } catch (IllegalStateException e) { + assertEquals("Message handler already registered.", e.getMessage()); + } + session.release(); + } + + @Test + public void requireThatMessageHandlerIsCalled() throws InterruptedException { + SharedDestinationSession session = newDestinationSession(); + MessageQueue queue = new MessageQueue(); + session.setMessageHandler(queue); + session.handleMessage(new SimpleMessage("foo")); + assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS)); + session.release(); + } + + @Test + public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException { + SharedDestinationSession session = newDestinationSession(); + Message msg = new SimpleMessage("foo"); + ReplyQueue queue = new ReplyQueue(); + msg.pushHandler(queue); + session.handleMessage(msg); + Reply reply = queue.awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode()); + session.release(); + } + + @Test + public void requireThatSessionIsClosedOnDestroy() { + SharedDestinationSession session = newDestinationSession(); + session.release(); + assertFalse("DestinationSession not destroyed by release().", session.session().destroy()); + } + + @Test + public void requireThatMbusIsReleasedOnDestroy() { + Slobrok slobrok = null; + try { + slobrok = new Slobrok(); + } catch (ListenFailedException e) { + fail(); + } + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId()); + SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams); + SharedDestinationSession session = mbus.newDestinationSession(new DestinationSessionParams()); + mbus.release(); + session.release(); + assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy()); + } + + @Test + public void requireThatSessionCanSendReply() throws InterruptedException { + RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true); + MessageQueue queue = new MessageQueue(); + DestinationSessionParams params = new DestinationSessionParams().setMessageHandler(queue); + SharedDestinationSession session = newDestinationSession(client.slobrokId(), params); + Route route = Route.parse(session.connectionSpec()); + + assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted()); + Message msg = queue.awaitMessage(60, TimeUnit.SECONDS); + assertNotNull(msg); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + session.sendReply(reply); + assertNotNull(client.awaitReply(60, TimeUnit.SECONDS)); + + session.release(); + client.close(); + } + + private static SharedDestinationSession newDestinationSession() { + Slobrok slobrok = null; + try { + slobrok = new Slobrok(); + } catch (ListenFailedException e) { + fail(); + } + return newDestinationSession(slobrok.configId(), new DestinationSessionParams()); + } + + private static SharedDestinationSession newDestinationSession(String slobrokId, DestinationSessionParams params) { + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId); + MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol()); + SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + SharedDestinationSession session = mbus.newDestinationSession(params); + mbus.release(); + return session; + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java new file mode 100644 index 00000000000..87958415149 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java @@ -0,0 +1,174 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.test.MessageQueue; +import com.yahoo.messagebus.jdisc.test.RemoteClient; +import com.yahoo.messagebus.jdisc.test.RemoteServer; +import com.yahoo.messagebus.jdisc.test.ReplyQueue; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author Simon Thoresen Hult + */ +public class SharedIntermediateSessionTestCase { + + @Test + public void requireThatMessageHandlerCanBeAccessed() { + SharedIntermediateSession session = newIntermediateSession(false); + assertNull(session.getMessageHandler()); + + MessageQueue handler = new MessageQueue(); + session.setMessageHandler(handler); + assertSame(handler, session.getMessageHandler()); + } + + @Test + public void requireThatMessageHandlerCanOnlyBeSetOnce() { + SharedIntermediateSession session = newIntermediateSession(false); + session.setMessageHandler(new MessageQueue()); + try { + session.setMessageHandler(new MessageQueue()); + fail(); + } catch (IllegalStateException e) { + assertEquals("Message handler already registered.", e.getMessage()); + } + session.release(); + } + + @Test + public void requireThatMessageHandlerIsCalled() throws InterruptedException { + SharedIntermediateSession session = newIntermediateSession(false); + MessageQueue queue = new MessageQueue(); + session.setMessageHandler(queue); + session.handleMessage(new SimpleMessage("foo")); + assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS)); + session.release(); + } + + @Test + public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException { + SharedIntermediateSession session = newIntermediateSession(false); + Message msg = new SimpleMessage("foo"); + ReplyQueue queue = new ReplyQueue(); + msg.pushHandler(queue); + session.handleMessage(msg); + Reply reply = queue.awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode()); + session.release(); + } + + @Test + public void requireThatReplyHandlerCanNotBeSet() throws ListenFailedException { + Slobrok slobrok = new Slobrok(); + try { + newIntermediateSession(slobrok.configId(), + new IntermediateSessionParams().setReplyHandler(new ReplyQueue()), + false); + fail(); + } catch (IllegalArgumentException e) { + assertEquals("Reply handler must be null.", e.getMessage()); + } + } + + @Test + public void requireThatSessionIsClosedOnDestroy() { + SharedIntermediateSession session = newIntermediateSession(false); + session.release(); + assertFalse("IntermediateSession not destroyed by release().", session.session().destroy()); + } + + @Test + public void requireThatMbusIsReleasedOnDestroy() { + try { + new Slobrok(); + } catch (ListenFailedException e) { + fail(); + } + SharedMessageBus mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), new MessageBusParams())); + + SharedIntermediateSession session = mbus.newIntermediateSession(new IntermediateSessionParams()); + mbus.release(); + session.release(); + assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy()); + } + + @Test + public void requireThatSessionCanSendMessage() throws InterruptedException { + RemoteServer server = RemoteServer.newInstanceWithInternSlobrok(); + SharedIntermediateSession session = newIntermediateSession(server.slobrokId(), + new IntermediateSessionParams(), + true); + ReplyQueue queue = new ReplyQueue(); + Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec())); + msg.setTimeReceivedNow(); + msg.setTimeRemaining(60000); + msg.getTrace().setLevel(9); + msg.pushHandler(queue); + assertTrue(session.sendMessage(msg).isAccepted()); + assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS)); + server.ackMessage(msg); + assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS)); + + session.release(); + server.close(); + } + + @Test + public void requireThatSessionCanSendReply() throws InterruptedException { + RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true); + MessageQueue queue = new MessageQueue(); + IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue); + SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true); + Route route = Route.parse(session.connectionSpec()); + + assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted()); + Message msg = queue.awaitMessage(60, TimeUnit.SECONDS); + assertNotNull(msg); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + session.sendReply(reply); + assertNotNull(client.awaitReply(60, TimeUnit.SECONDS)); + + session.release(); + client.close(); + } + + private static SharedIntermediateSession newIntermediateSession(boolean network) { + Slobrok slobrok = null; + try { + slobrok = new Slobrok(); + } catch (ListenFailedException e) { + fail(); + } + return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network); + } + + private static SharedIntermediateSession newIntermediateSession(String slobrokId, + IntermediateSessionParams params, + boolean network) { + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId); + MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol()); + SharedMessageBus mbus = network + ? SharedMessageBus.newInstance(mbusParams, netParams) + : new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), mbusParams)); + SharedIntermediateSession session = mbus.newIntermediateSession(params); + mbus.release(); + return session; + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java new file mode 100644 index 00000000000..a54489a89e6 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java @@ -0,0 +1,37 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** + * @author Simon Thoresen Hult + */ +public class SharedMessageBusTestCase { + + @Test + public void requireThatMbusCanNotBeNull() { + try { + new SharedMessageBus(null); + fail(); + } catch (NullPointerException e) { + // expected + } + } + + @Test + public void requireThatMbusIsClosedOnDestroy() throws ListenFailedException { + Slobrok slobrok = new Slobrok(); + SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), + new RPCNetworkParams() + .setSlobrokConfigId(slobrok.configId())); + mbus.release(); + assertFalse(mbus.messageBus().destroy()); + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java new file mode 100644 index 00000000000..1f0966fc961 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java @@ -0,0 +1,94 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.jdisc.test.RemoteServer; +import com.yahoo.messagebus.jdisc.test.ReplyQueue; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author Simon Thoresen Hult + */ +public class SharedSourceSessionTestCase { + + @Test + public void requireThatReplyHandlerCanNotBeSet() { + try { + newSourceSession(new SourceSessionParams().setReplyHandler(new ReplyQueue())); + fail(); + } catch (IllegalArgumentException e) { + assertEquals("Reply handler must be null.", e.getMessage()); + } + } + + @Test + public void requireThatSessionIsClosedOnDestroy() { + SharedSourceSession session = newSourceSession(new SourceSessionParams()); + session.release(); + assertFalse("SourceSession not destroyed by release().", session.session().destroy()); + } + + @Test + public void requireThatMbusIsReleasedOnDestroy() { + Slobrok slobrok = null; + try { + slobrok = new Slobrok(); + } catch (ListenFailedException e) { + fail(); + } + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId()); + SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams); + SharedSourceSession session = mbus.newSourceSession(new SourceSessionParams()); + mbus.release(); + session.release(); + assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy()); + } + + @Test + public void requireThatSessionCanSendMessage() throws InterruptedException { + RemoteServer server = RemoteServer.newInstanceWithInternSlobrok(); + SharedSourceSession session = newSourceSession(server.slobrokId(), + new SourceSessionParams()); + ReplyQueue queue = new ReplyQueue(); + Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec())); + msg.pushHandler(queue); + assertTrue(session.sendMessage(msg).isAccepted()); + assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS)); + server.ackMessage(msg); + assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS)); + + session.release(); + server.close(); + } + + private static SharedSourceSession newSourceSession(SourceSessionParams params) { + Slobrok slobrok = null; + try { + slobrok = new Slobrok(); + } catch (ListenFailedException e) { + fail(); + } + return newSourceSession(slobrok.configId(), params); + } + + private static SharedSourceSession newSourceSession(String slobrokId, SourceSessionParams params) { + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId); + MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol()); + SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + SharedSourceSession session = mbus.newSourceSession(params); + mbus.release(); + return session; + } +} |