aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus
diff options
context:
space:
mode:
authorgjoranv <gv@verizonmedia.com>2021-03-29 21:14:20 +0200
committergjoranv <gv@verizonmedia.com>2021-03-29 21:14:20 +0200
commit2c7ee7bc9d3e588f11ce23d66f8c111109c8a2fc (patch)
tree148ff85b5ea211859cdc863ab24fe568060e78d0 /container-messagebus
parent09cbb21dbfba2c80660945c84b1b2d9a0fffaf24 (diff)
Add source code from jdisc_mbus_service into container-messagebus.
Diffstat (limited to 'container-messagebus')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java22
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java147
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java38
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java59
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java25
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java135
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java77
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/package-info.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java134
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java27
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java76
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java87
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java26
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java155
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/network/package-info.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/package-info.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/routing/package-info.java5
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java14
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java73
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java22
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java85
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java104
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java68
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java58
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java8
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/test/package-info.java5
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java149
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java345
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java121
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java73
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java46
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java694
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java374
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java137
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java32
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java34
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java134
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java174
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java37
-rw-r--r--container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java94
41 files changed, 3914 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java
new file mode 100644
index 00000000000..c64fea8653b
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java
@@ -0,0 +1,22 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.handler.CompletionHandler;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+enum IgnoredCompletionHandler implements CompletionHandler {
+
+ INSTANCE;
+
+ @Override
+ public void completed() {
+
+ }
+
+ @Override
+ public void failed(final Throwable t) {
+
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
new file mode 100644
index 00000000000..922e4140868
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
@@ -0,0 +1,147 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.google.inject.Inject;
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.RequestDeniedException;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.service.ClientProvider;
+import java.util.logging.Level;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.shared.ClientSession;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public final class MbusClient extends AbstractResource implements ClientProvider, ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(MbusClient.class.getName());
+ private final BlockingQueue<MbusRequest> queue = new LinkedBlockingQueue<>();
+ private final ClientSession session;
+ private final Thread thread = new Thread(new SenderTask(), "MbusClient");
+ private volatile boolean done = false;
+ private final ResourceReference sessionReference;
+
+ @Inject
+ public MbusClient(ClientSession session) {
+ this.session = session;
+ this.sessionReference = session.refer();
+ }
+
+ @Override
+ public void start() {
+ thread.start();
+ }
+
+ @Override
+ public ContentChannel handleRequest(Request request, ResponseHandler handler) {
+ if (!(request instanceof MbusRequest)) {
+ throw new RequestDeniedException(request);
+ }
+ final Message msg = ((MbusRequest)request).getMessage();
+ msg.getTrace().trace(6, "Request received by MbusClient.");
+ msg.pushHandler(null); // save user context
+ final Long timeout = request.timeRemaining(TimeUnit.MILLISECONDS);
+ if (timeout != null) {
+ msg.setTimeReceivedNow();
+ msg.setTimeRemaining(Math.max(1, timeout)); // negative or zero timeout has semantics
+ }
+ msg.setContext(handler);
+ msg.pushHandler(this);
+ sendBlocking((MbusRequest)request);
+ return null;
+ }
+
+ @Override
+ public void handleTimeout(Request request, final ResponseHandler handler) {
+ // ignore, mbus has guaranteed reply
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying message bus client.");
+ sessionReference.close();
+ done = true;
+ }
+
+ @Override
+ public void handleReply(final Reply reply) {
+ reply.getTrace().trace(6, "Reply received by MbusClient.");
+ final ResponseHandler handler = (ResponseHandler)reply.getContext();
+ reply.popHandler(); // restore user context
+ try {
+ handler.handleResponse(new MbusResponse(StatusCodes.fromMbusReply(reply), reply))
+ .close(IgnoredCompletionHandler.INSTANCE);
+ } catch (final Exception e) {
+ log.log(Level.WARNING, "Ignoring exception thrown by ResponseHandler.", e);
+ }
+ }
+
+ private void sendBlocking(MbusRequest request) {
+ while (!sendMessage(request)) {
+ try {
+ Thread.sleep(5);
+ } catch (final InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ private boolean sendMessage(MbusRequest request) {
+ Error error;
+ final Long millis = request.timeRemaining(TimeUnit.MILLISECONDS);
+ if (millis != null && millis <= 0) {
+ error = new Error(ErrorCode.TIMEOUT, request.getTimeout(TimeUnit.MILLISECONDS) + " millis");
+ } else if (request.isCancelled()) {
+ error = new Error(ErrorCode.APP_FATAL_ERROR, "request cancelled");
+ } else {
+ try {
+ error = session.sendMessage(request.getMessage()).getError();
+ } catch (final Exception e) {
+ error = new Error(ErrorCode.FATAL_ERROR, e.toString());
+ }
+ }
+ if (error == null) {
+ return true;
+ }
+ if (error.isFatal()) {
+ final Reply reply = new EmptyReply();
+ reply.swapState(request.getMessage());
+ reply.addError(error);
+ reply.popHandler().handleReply(reply);
+ return true;
+ }
+ return false;
+ }
+
+ private class SenderTask implements Runnable {
+
+ @Override
+ public void run() {
+ while (!done) {
+ try {
+ final MbusRequest request = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (request == null) {
+ continue;
+ }
+ sendBlocking(request);
+ } catch (final Exception e) {
+ log.log(Level.WARNING, "Ignoring exception thrown by MbusClient.", e);
+ }
+ }
+ }
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java
new file mode 100644
index 00000000000..a0bedd678eb
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java
@@ -0,0 +1,38 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.service.CurrentContainer;
+import com.yahoo.messagebus.Message;
+
+import java.net.URI;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MbusRequest extends Request {
+
+ private final Message message;
+
+ public MbusRequest(CurrentContainer current, URI uri, Message msg) {
+ super(current, uri);
+ this.message = validateMessage(msg);
+ }
+
+ public MbusRequest(Request parent, URI uri, Message msg) {
+ super(parent, uri);
+ this.message = validateMessage(msg);
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ private Message validateMessage(Message msg) {
+ if (msg != null) {
+ return msg;
+ }
+ release();
+ throw new NullPointerException();
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java
new file mode 100644
index 00000000000..fb5657a9215
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java
@@ -0,0 +1,59 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public abstract class MbusRequestHandler extends AbstractRequestHandler implements MessageHandler {
+
+ @Override
+ public ContentChannel handleRequest(final Request request, final ResponseHandler handler) {
+ if (!(request instanceof MbusRequest)) {
+ throw new UnsupportedOperationException("Expected MbusRequest, got " + request.getClass().getName() + ".");
+ }
+ final Message msg = ((MbusRequest)request).getMessage();
+ msg.pushHandler(new RespondingReplyHandler(handler));
+ handleMessage(msg);
+ return null;
+ }
+
+ private static class RespondingReplyHandler implements ReplyHandler {
+
+ private final ResponseHandler handler;
+
+ RespondingReplyHandler(final ResponseHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void handleReply(final Reply reply) {
+ final MbusResponse response = new MbusResponse(StatusCodes.fromMbusReply(reply), reply);
+ handler.handleResponse(response).close(IgnoringCompletionHandler.INSTANCE);
+ }
+ }
+
+ private static class IgnoringCompletionHandler implements CompletionHandler {
+
+ public static final IgnoringCompletionHandler INSTANCE = new IgnoringCompletionHandler();
+
+ @Override
+ public void completed() {
+
+ }
+
+ @Override
+ public void failed(final Throwable t) {
+
+ }
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java
new file mode 100644
index 00000000000..37da4d8569f
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java
@@ -0,0 +1,25 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Response;
+import com.yahoo.messagebus.Reply;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MbusResponse extends Response {
+
+ private final Reply reply;
+
+ public MbusResponse(int status, Reply reply) {
+ super(status);
+ if (reply == null) {
+ throw new NullPointerException();
+ }
+ this.reply = reply;
+ }
+
+ public Reply getReply() {
+ return reply;
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
new file mode 100644
index 00000000000..e26e1e7e134
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
@@ -0,0 +1,135 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.google.inject.Inject;
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.service.CurrentContainer;
+import com.yahoo.jdisc.service.ServerProvider;
+import java.util.logging.Level;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.shared.ServerSession;
+
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler {
+
+ private final static Logger log = Logger.getLogger(MbusServer.class.getName());
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final CurrentContainer container;
+ private final ServerSession session;
+ private final URI uri;
+ private final ResourceReference sessionReference;
+
+ @Inject
+ public MbusServer(CurrentContainer container, ServerSession session) {
+ this.container = container;
+ this.session = session;
+ uri = URI.create("mbus://localhost/" + session.name());
+ session.setMessageHandler(this);
+ sessionReference = session.refer();
+ }
+
+ @Override
+ public void start() {
+ log.log(Level.FINE, "Starting message bus server.");
+ running.set(true);
+ }
+
+ @Override
+ public void close() {
+ log.log(Level.FINE, "Closing message bus server.");
+ running.set(false);
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying message bus server.");
+ running.set(false);
+ sessionReference.close();
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ if (!running.get()) {
+ dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed.");
+ return;
+ }
+ if (msg.getTrace().shouldTrace(6)) {
+ msg.getTrace().trace(6, "Message received by MbusServer.");
+ }
+ Request request = null;
+ ContentChannel content = null;
+ try {
+ request = new MbusRequest(container, uri, msg);
+ content = request.connect(new ServerResponseHandler(msg));
+ } catch (RuntimeException e) {
+ dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString());
+ } finally {
+ if (request != null) {
+ request.release();
+ }
+ }
+ if (content != null) {
+ content.close(IgnoredCompletionHandler.INSTANCE);
+ }
+ }
+
+ public String connectionSpec() {
+ return session.connectionSpec();
+ }
+
+ private void dispatchErrorReply(Message msg, int errCode, String errMsg) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(errCode, errMsg));
+ session.sendReply(reply);
+ }
+
+ private class ServerResponseHandler implements ResponseHandler {
+
+ final Message msg;
+
+ ServerResponseHandler(Message msg) {
+ this.msg = msg;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ Reply reply;
+ if (response instanceof MbusResponse) {
+ reply = ((MbusResponse)response).getReply();
+ } else {
+ reply = new EmptyReply();
+ reply.swapState(msg);
+ }
+ Error err = StatusCodes.toMbusError(response.getStatus());
+ if (err != null) {
+ if (err.isFatal()) {
+ if (!reply.hasFatalErrors()) {
+ reply.addError(err);
+ }
+ } else {
+ if (!reply.hasErrors()) {
+ reply.addError(err);
+ }
+ }
+ }
+ if (reply.getTrace().shouldTrace(6)) {
+ reply.getTrace().trace(6, "Sending reply from MbusServer.");
+ }
+ session.sendReply(reply);
+ return null;
+ }
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
new file mode 100644
index 00000000000..6570c910af3
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
@@ -0,0 +1,77 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Response;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Reply;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class StatusCodes {
+
+ public static int fromMbusReply(final Reply reply) {
+ int statusCode = Response.Status.OK;
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ statusCode = Math.max(statusCode, fromMbusError(reply.getError(i)));
+ }
+ return statusCode;
+ }
+
+ public static int fromMbusError(final Error error) {
+ final int errorCode = error.getCode();
+ if (errorCode < ErrorCode.TRANSIENT_ERROR) {
+ return Response.Status.OK;
+ }
+ if (errorCode < ErrorCode.FATAL_ERROR) {
+ return Response.Status.TEMPORARY_REDIRECT;
+ }
+ switch (errorCode) {
+ case ErrorCode.SEND_QUEUE_CLOSED:
+ return Response.Status.LOCKED;
+ case ErrorCode.ILLEGAL_ROUTE:
+ return Response.Status.BAD_REQUEST;
+ case ErrorCode.NO_SERVICES_FOR_ROUTE:
+ return Response.Status.NOT_FOUND;
+ case ErrorCode.ENCODE_ERROR:
+ return Response.Status.BAD_REQUEST;
+ case ErrorCode.NETWORK_ERROR:
+ return Response.Status.BAD_REQUEST; // got nothing better
+ case ErrorCode.UNKNOWN_PROTOCOL:
+ return Response.Status.UNSUPPORTED_MEDIA_TYPE;
+ case ErrorCode.DECODE_ERROR:
+ return Response.Status.UNSUPPORTED_MEDIA_TYPE;
+ case ErrorCode.TIMEOUT:
+ return Response.Status.REQUEST_TIMEOUT;
+ case ErrorCode.INCOMPATIBLE_VERSION:
+ return Response.Status.VERSION_NOT_SUPPORTED;
+ case ErrorCode.UNKNOWN_POLICY:
+ return Response.Status.BAD_REQUEST;
+ case ErrorCode.NETWORK_SHUTDOWN:
+ return Response.Status.LOCKED;
+ case ErrorCode.POLICY_ERROR:
+ return Response.Status.PRECONDITION_FAILED;
+ case ErrorCode.SEQUENCE_ERROR:
+ return Response.Status.PRECONDITION_FAILED;
+ case ErrorCode.APP_FATAL_ERROR:
+ return Response.Status.INTERNAL_SERVER_ERROR;
+ default:
+ return Response.Status.INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ public static Error toMbusError(final int statusCode) {
+ if (statusCode < 300) {
+ return null;
+ } else if (statusCode < 400) {
+ return new Error(ErrorCode.APP_TRANSIENT_ERROR, statusCode + " Redirection");
+ } else if (statusCode < 500) {
+ return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Client Error");
+ } else if (statusCode < 600) {
+ return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Server Error");
+ } else {
+ return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Unknown Error");
+ }
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/package-info.java
new file mode 100644
index 00000000000..9aea8cf7db8
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
new file mode 100644
index 00000000000..111805d61b0
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
@@ -0,0 +1,134 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jdisc.References;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.MbusClient;
+import com.yahoo.messagebus.jdisc.MbusRequest;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ClientTestDriver {
+
+ private final RemoteServer server;
+ private final MbusClient client;
+ private final SharedSourceSession session;
+ private final TestDriver driver;
+
+ private ClientTestDriver(RemoteServer server, Protocol protocol) {
+ this.server = server;
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ session = mbus.newSourceSession(new SourceSessionParams());
+ client = new MbusClient(session);
+ client.start();
+ mbus.release();
+
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ ContainerBuilder builder = driver.newContainerBuilder();
+ builder.clientBindings().bind("mbus://*/*", client);
+ driver.activateContainer(builder);
+ }
+
+ public SourceSession sourceSession() {
+ return session.session();
+ }
+
+ public Request newServerRequest() {
+ return new Request(driver, URI.create("mbus://localhost/"));
+ }
+
+ public Request newClientRequest(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ if (msg.getTrace().getLevel() == 0) {
+ msg.getTrace().setLevel(9);
+ }
+ final Request parent = newServerRequest();
+ try (final ResourceReference ref = References.fromResource(parent)) {
+ return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg);
+ }
+ }
+
+ public boolean sendRequest(Request request, ResponseHandler responseHandler) {
+ request.connect(responseHandler).close(null);
+ return true;
+ }
+
+ public boolean sendMessage(Message msg, ResponseHandler responseHandler) {
+ final Request request = newClientRequest(msg);
+ try (final ResourceReference ref = References.fromResource(request)) {
+ return sendRequest(request, responseHandler);
+ }
+ }
+
+ public Message awaitMessage() {
+ Message msg = null;
+ try {
+ msg = server.awaitMessage(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (msg != null) {
+ msg.getTrace().trace(0, "Message received by RemoteServer.");
+ }
+ return msg;
+ }
+
+ public void sendReply(Reply reply) {
+ reply.getTrace().trace(0, "Sending reply from RemoteServer.");
+ server.sendReply(reply);
+ }
+
+ public boolean awaitMessageAndSendReply(Reply reply) {
+ Message msg = awaitMessage();
+ if (msg == null) {
+ return false;
+ }
+ reply.swapState(msg);
+ sendReply(reply);
+ return true;
+ }
+
+ public boolean close() {
+ session.release();
+ client.release();
+ server.close();
+ return driver.close();
+ }
+
+ public MbusClient client() {
+ return client;
+ }
+
+ public RemoteServer server() {
+ return server;
+ }
+
+ public static ClientTestDriver newInstance() {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol());
+ }
+
+ public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
+ }
+
+ public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
new file mode 100644
index 00000000000..c5287165e27
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
@@ -0,0 +1,27 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MessageQueue implements MessageHandler {
+
+ private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+
+ @Override
+ public void handleMessage(Message msg) {
+ queue.add(msg);
+ }
+
+ public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
new file mode 100644
index 00000000000..57d0abd980b
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
@@ -0,0 +1,76 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.local.LocalNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class RemoteClient {
+
+ private final Slobrok slobrok;
+ private final String slobrokId;
+ private final MessageBus mbus;
+ private final ReplyQueue queue = new ReplyQueue();
+ private final SourceSession session;
+
+ private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ mbus = network
+ ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)),
+ new MessageBusParams().addProtocol(protocol))
+ : new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol));
+ session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue));
+ }
+
+ public Result sendMessage(Message msg) {
+ return session.send(msg);
+ }
+
+ public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.awaitReply(timeout, unit);
+ }
+
+ public String slobrokId() {
+ return slobrokId;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
+ }
+
+ public static RemoteClient newInstanceWithInternSlobrok(boolean network) {
+ return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network);
+ }
+
+ public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) {
+ return new RemoteClient(null, slobrokId, new SimpleProtocol(), network);
+ }
+
+ public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) {
+ return new RemoteClient(newSlobrok(), null, protocol, network);
+ }
+
+ private static Slobrok newSlobrok() {
+ Slobrok slobrok;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ throw new IllegalStateException(e);
+ }
+ return slobrok;
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
new file mode 100644
index 00000000000..1f0f82c4903
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
@@ -0,0 +1,87 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class RemoteServer {
+
+ private final Slobrok slobrok;
+ private final String slobrokId;
+ private final MessageBus mbus;
+ private final MessageQueue queue = new MessageQueue();
+ private final DestinationSession session;
+
+ private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
+ .setSlobrokConfigId(this.slobrokId)
+ .setIdentity(new Identity(identity))),
+ new MessageBusParams().addProtocol(protocol));
+ session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue));
+ }
+
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.awaitMessage(timeout, unit);
+ }
+
+ public void ackMessage(Message msg) {
+ session.acknowledge(msg);
+ }
+
+ public void sendReply(Reply reply) {
+ session.reply(reply);
+ }
+
+ public String slobrokId() {
+ return slobrokId;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
+ }
+
+ public static RemoteServer newInstanceWithInternSlobrok() {
+ return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) {
+ return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) {
+ return new RemoteServer(null, slobrokId, protocol, identity);
+ }
+
+ public static RemoteServer newInstanceWithProtocol(Protocol protocol) {
+ return new RemoteServer(newSlobrok(), null, protocol, "remote");
+ }
+
+ private static Slobrok newSlobrok() {
+ try {
+ return new Slobrok();
+ } catch (ListenFailedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
new file mode 100644
index 00000000000..6c48aab5a7f
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
@@ -0,0 +1,26 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ReplyQueue implements ReplyHandler {
+
+ private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>();
+
+ @Override
+ public void handleReply(Reply reply) {
+ queue.add(reply);
+ }
+
+ public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
new file mode 100644
index 00000000000..e59db28e886
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
@@ -0,0 +1,155 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.google.inject.Module;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.jdisc.MbusServer;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.ServerSession;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ServerTestDriver {
+
+ private final RemoteClient client;
+ private final MbusServer server;
+ private final TestDriver driver;
+
+ private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler,
+ Protocol protocol, Module... guiceModules)
+ {
+ this.client = client;
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules);
+ if (activateContainer) {
+ ContainerBuilder builder = driver.newContainerBuilder();
+ if (requestHandler != null) {
+ builder.serverBindings().bind("mbus://*/*", requestHandler);
+ }
+ driver.activateContainer(builder);
+ }
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ ServerSession session = mbus.newDestinationSession(new DestinationSessionParams());
+ server = new MbusServer(driver, session);
+ server.start();
+ session.release();
+ mbus.release();
+ }
+
+ public boolean sendMessage(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ msg.getTrace().setLevel(9);
+ return client.sendMessage(msg).isAccepted();
+ }
+
+ public Reply awaitReply() {
+ Reply reply = null;
+ try {
+ reply = client.awaitReply(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (reply != null) {
+ System.out.println(reply.getTrace());
+ }
+ return reply;
+ }
+
+ public Reply awaitSuccess() {
+ Reply reply = awaitReply();
+ if (reply == null || reply.hasErrors()) {
+ return null;
+ }
+ return reply;
+ }
+
+ public Reply awaitErrors(Integer... errCodes) {
+ Reply reply = awaitReply();
+ if (reply == null) {
+ return null;
+ }
+ List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes));
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ Error err = reply.getError(i);
+ System.out.println(err);
+ int idx = lst.indexOf(err.getCode());
+ if (idx < 0) {
+ return null;
+ }
+ lst.remove(idx);
+ }
+ if (!lst.isEmpty()) {
+ return null;
+ }
+ return reply;
+ }
+
+ public boolean close() {
+ server.close();
+ server.release();
+ client.close();
+ return driver.close();
+ }
+
+ public TestDriver parent() {
+ return driver;
+ }
+
+ public RemoteClient client() {
+ return client;
+ }
+
+ public MbusServer server() {
+ return server;
+ }
+
+ public static ServerTestDriver newInstance(RequestHandler requestHandler, boolean network, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler,
+ new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler,
+ boolean network, Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, protocol,
+ guiceModules);
+ }
+
+ public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler,
+ boolean network, Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network),
+ true, requestHandler, new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null,
+ new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, boolean network, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol, network), false, null,
+ protocol, guiceModules);
+ }
+
+ public static ServerTestDriver newUnboundInstance(boolean network, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, null,
+ new SimpleProtocol(), guiceModules);
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/network/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/network/package-info.java
new file mode 100644
index 00000000000..72f563e8bd7
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/network/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.network;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java
new file mode 100644
index 00000000000..7b468813713
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/package-info.java
new file mode 100644
index 00000000000..63b713e70e0
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/routing/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/routing/package-info.java
new file mode 100644
index 00000000000..ba8fc5fafba
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/routing/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
new file mode 100644
index 00000000000..0964a254cf2
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
@@ -0,0 +1,14 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.SharedResource;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Result;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public interface ClientSession extends SharedResource {
+
+ public Result sendMessage(Message msg);
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
new file mode 100644
index 00000000000..ad58d6b9a5e
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
@@ -0,0 +1,73 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkOwner;
+import com.yahoo.messagebus.routing.RoutingNode;
+
+import java.util.List;
+
+/**
+ * <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p>
+ *
+ * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a>
+ */
+class NullNetwork implements Network {
+
+ @Override
+ public boolean waitUntilReady(double seconds) {
+ return true;
+ }
+
+ @Override
+ public void attach(NetworkOwner owner) {
+
+ }
+
+ @Override
+ public void registerSession(String session) {
+
+ }
+
+ @Override
+ public void unregisterSession(String session) {
+
+ }
+
+ @Override
+ public boolean allocServiceAddress(RoutingNode recipient) {
+ return false;
+ }
+
+ @Override
+ public void freeServiceAddress(RoutingNode recipient) {
+
+ }
+
+ @Override
+ public void send(Message msg, List<RoutingNode> recipients) {
+
+ }
+
+ @Override
+ public void sync() {
+
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public String getConnectionSpec() {
+ return null;
+ }
+
+ @Override
+ public IMirror getMirror() {
+ return null;
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
new file mode 100644
index 00000000000..56713815c7a
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
@@ -0,0 +1,22 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.SharedResource;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public interface ServerSession extends SharedResource {
+
+ public MessageHandler getMessageHandler();
+
+ public void setMessageHandler(MessageHandler msgHandler);
+
+ public void sendReply(Reply reply);
+
+ public String connectionSpec();
+
+ public String name();
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
new file mode 100644
index 00000000000..7da164757cd
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
@@ -0,0 +1,85 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import java.util.logging.Level;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedDestinationSession extends AbstractResource implements MessageHandler, ServerSession {
+
+ private static final Logger log = Logger.getLogger(SharedDestinationSession.class.getName());
+ private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>();
+ private final DestinationSession session;
+ private final ResourceReference mbusReference;
+
+ SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) {
+ this.msgHandler.set(params.getMessageHandler());
+ this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public DestinationSession session() {
+ return session;
+ }
+
+ @Override
+ public void sendReply(Reply reply) {
+ session.reply(reply);
+ }
+
+ @Override
+ public MessageHandler getMessageHandler() {
+ return msgHandler.get();
+ }
+
+ @Override
+ public void setMessageHandler(MessageHandler msgHandler) {
+ if (!this.msgHandler.compareAndSet(null, msgHandler)) {
+ throw new IllegalStateException("Message handler already registered.");
+ }
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ MessageHandler msgHandler = this.msgHandler.get();
+ if (msgHandler == null) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet."));
+ sendReply(reply);
+ return;
+ }
+ msgHandler.handleMessage(msg);
+ }
+
+ @Override
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ @Override
+ public String name() {
+ return session.getName();
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying shared destination session.");
+ session.destroy();
+ mbusReference.close();
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
new file mode 100644
index 00000000000..5c9fab46e34
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
@@ -0,0 +1,104 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import java.util.logging.Level;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.IntermediateSession;
+import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedIntermediateSession extends AbstractResource
+ implements ClientSession, ServerSession, MessageHandler, ReplyHandler
+{
+
+ private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName());
+ private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>();
+ private final IntermediateSession session;
+ private final ResourceReference mbusReference;
+
+ public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) {
+ if (params.getReplyHandler() != null) {
+ throw new IllegalArgumentException("Reply handler must be null.");
+ }
+ this.msgHandler.set(params.getMessageHandler());
+ this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this)
+ .setMessageHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public IntermediateSession session() {
+ return session;
+ }
+
+ @Override
+ public Result sendMessage(Message msg) {
+ session.forward(msg);
+ return Result.ACCEPTED;
+ }
+
+ @Override
+ public void sendReply(Reply reply) {
+ session.forward(reply);
+ }
+
+ @Override
+ public MessageHandler getMessageHandler() {
+ return msgHandler.get();
+ }
+
+ @Override
+ public void setMessageHandler(MessageHandler msgHandler) {
+ if (!this.msgHandler.compareAndSet(null, msgHandler)) {
+ throw new IllegalStateException("Message handler already registered.");
+ }
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ MessageHandler msgHandler = this.msgHandler.get();
+ if (msgHandler == null) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet."));
+ sendReply(reply);
+ return;
+ }
+ msgHandler.handleMessage(msg);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ reply.popHandler().handleReply(reply);
+ }
+
+ @Override
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ @Override
+ public String name() {
+ return session.getName();
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying shared intermediate session.");
+ session.destroy();
+ mbusReference.close();
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
new file mode 100644
index 00000000000..dd135a51378
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
@@ -0,0 +1,68 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.config.subscription.ConfigGetter;
+import com.yahoo.jdisc.AbstractResource;
+import java.util.logging.Level;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.cloud.config.SlobroksConfig;
+
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedMessageBus extends AbstractResource {
+
+ private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName());
+ private final MessageBus mbus;
+
+ public SharedMessageBus(MessageBus mbus) {
+ mbus.getClass(); // throws NullPointerException
+ this.mbus = mbus;
+ }
+
+ public MessageBus messageBus() {
+ return mbus;
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying shared message bus.");
+ mbus.destroy();
+ }
+
+ public SharedSourceSession newSourceSession(SourceSessionParams params) {
+ return new SharedSourceSession(this, params);
+ }
+
+ public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) {
+ return new SharedIntermediateSession(this, params);
+ }
+
+ public SharedDestinationSession newDestinationSession(DestinationSessionParams params) {
+ return new SharedDestinationSession(this, params);
+ }
+
+ public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) {
+ return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams));
+ }
+
+ private static Network newNetwork(RPCNetworkParams params) {
+ SlobroksConfig cfg = params.getSlobroksConfig();
+ if (cfg == null) {
+ cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId());
+ }
+ if (cfg.slobrok().isEmpty()) {
+ return new NullNetwork(); // for LocalApplication
+ }
+ return new RPCNetwork(params);
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
new file mode 100644
index 00000000000..56071682349
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
@@ -0,0 +1,58 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import java.util.logging.Level;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+
+import java.util.logging.Logger;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName());
+ private final SourceSession session;
+ private final ResourceReference mbusReference;
+
+ public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) {
+ if (params.getReplyHandler() != null) {
+ throw new IllegalArgumentException("Reply handler must be null.");
+ }
+ this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public SourceSession session() {
+ return session;
+ }
+
+ @Override
+ public Result sendMessage(Message msg) {
+ return session.send(msg);
+ }
+
+ public Result sendMessageBlocking(Message msg) throws InterruptedException {
+ return session.sendBlocking(msg);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ reply.popHandler().handleReply(reply);
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(Level.FINE, "Destroying shared source session.");
+ session.close();
+ mbusReference.close();
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java
new file mode 100644
index 00000000000..941a0dc4c5c
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/shared/package-info.java
@@ -0,0 +1,8 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * Not a public API, exported for use in internal components.
+ */
+@ExportPackage
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/test/package-info.java b/container-messagebus/src/main/java/com/yahoo/messagebus/test/package-info.java
new file mode 100644
index 00000000000..42bc03b6e17
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/test/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.test;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java
new file mode 100644
index 00000000000..62a9a864781
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java
@@ -0,0 +1,149 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.FutureResponse;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.network.local.LocalNetwork;
+import com.yahoo.messagebus.network.local.LocalWire;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ClientThreadingTestCase {
+
+ private static final int NUM_THREADS = 32;
+ private static final int NUM_REQUESTS = 1000;
+
+ @Test
+ @Ignore
+ public void requireThatClientIsThreadSafe() throws Exception {
+ final LocalWire wire = new LocalWire();
+ final Client client = new Client(wire);
+ final Server server = new Server(wire);
+
+ final List<Callable<Boolean>> lst = new LinkedList<>();
+ final Route route = Route.parse(server.session.getConnectionSpec());
+ for (int i = 0; i < NUM_THREADS; ++i) {
+ lst.add(new RequestTask(client, route));
+ }
+ final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
+ for (final Future<Boolean> res : executor.invokeAll(lst, 60, TimeUnit.SECONDS)) {
+ assertThat(res.get(), is(true));
+ }
+
+ assertThat(client.close(), is(true));
+ assertThat(server.close(), is(true));
+ }
+
+ private static final class RequestTask implements Callable<Boolean> {
+
+ final Client client;
+ final Route route;
+
+ RequestTask(final Client client, final Route route) {
+ this.client = client;
+ this.route = route;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ for (int i = 0; i < NUM_REQUESTS; ++i) {
+ final FutureResponse responseHandler = new FutureResponse();
+ client.send(new SimpleMessage("foo").setRoute(route), responseHandler);
+ responseHandler.get(60, TimeUnit.SECONDS);
+ }
+ return true;
+ }
+ }
+
+ private static class Client {
+
+ final MbusClient delegate;
+ final TestDriver driver;
+
+ Client(final LocalWire wire) {
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ delegate = newMbusClient(wire);
+
+ final ContainerBuilder builder = driver.newContainerBuilder();
+ builder.clientBindings().bind("mbus://*/*", delegate);
+ driver.activateContainer(builder);
+ delegate.start();
+ }
+
+ void send(final Message msg, final ResponseHandler handler) {
+ final MbusRequest request = new MbusRequest(driver, URI.create("mbus://remote/"), msg);
+ request.setServerRequest(false);
+ request.connect(handler).close(null);
+ request.release();
+ }
+
+ boolean close() {
+ delegate.release();
+ return driver.close();
+ }
+ }
+
+ private static class Server implements MessageHandler {
+
+ final MessageBus mbus;
+ final DestinationSession session;
+
+ Server(final LocalWire wire) {
+ mbus = new MessageBus(
+ new LocalNetwork(wire),
+ new MessageBusParams().addProtocol(new SimpleProtocol()));
+ session = mbus.createDestinationSession(
+ new DestinationSessionParams().setMessageHandler(this));
+ }
+
+ @Override
+ public void handleMessage(final Message msg) {
+ session.acknowledge(msg);
+ }
+
+ boolean close() {
+ return session.destroy() && mbus.destroy();
+ }
+ }
+
+ private static MbusClient newMbusClient(final LocalWire wire) {
+ final SharedMessageBus mbus = new SharedMessageBus(new MessageBus(
+ new LocalNetwork(wire),
+ new MessageBusParams().addProtocol(new SimpleProtocol())));
+ final SharedSourceSession session = mbus.newSourceSession(
+ new SourceSessionParams());
+ final MbusClient client = new MbusClient(session);
+ session.release();
+ mbus.release();
+ return client;
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java
new file mode 100644
index 00000000000..9cfd1fd02b9
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java
@@ -0,0 +1,345 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.RequestDeniedException;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.test.ClientTestDriver;
+import com.yahoo.messagebus.shared.ClientSession;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MbusClientTestCase {
+
+ @Test
+ public void requireThatClientRetainsSession() {
+ MySession session = new MySession();
+ assertEquals(1, session.refCount);
+ MbusClient client = new MbusClient(session);
+ assertEquals(2, session.refCount);
+ session.release();
+ assertEquals(1, session.refCount);
+ client.destroy();
+ assertEquals(0, session.refCount);
+ }
+
+ @Test
+ public void requireThatRequestResponseWorks() {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+ assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler));
+ assertTrue(driver.awaitMessageAndSendReply(new EmptyReply()));
+
+ Response response = responseHandler.awaitResponse();
+ assertNotNull(response);
+ assertEquals(Response.Status.OK, response.getStatus());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatNonMbusRequestIsDenied() throws InterruptedException {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ Request serverReq = null;
+ Request clientReq = null;
+ try {
+ serverReq = driver.newServerRequest();
+ clientReq = new Request(serverReq, URI.create("mbus://host/path"));
+ clientReq.connect(MyResponseHandler.newInstance());
+ fail();
+ } catch (RequestDeniedException e) {
+ System.out.println(e.getMessage());
+ } finally {
+ if (serverReq != null) {
+ serverReq.release();
+ }
+ if (clientReq != null) {
+ clientReq.release();
+ }
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatRequestContentDoesNotSupportWrite() throws InterruptedException {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+
+ Request request = null;
+ ContentChannel content;
+ try {
+ request = driver.newClientRequest(new SimpleMessage("foo"));
+ content = request.connect(responseHandler);
+ } finally {
+ if (request != null) {
+ request.release();
+ }
+ }
+ try {
+ content.write(ByteBuffer.allocate(69), null);
+ fail();
+ } catch (UnsupportedOperationException e) {
+
+ }
+ content.close(null);
+
+ assertTrue(driver.awaitMessageAndSendReply(new EmptyReply()));
+ assertNotNull(responseHandler.awaitResponse());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatResponseIsMbus() {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+ assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler));
+ assertTrue(driver.awaitMessageAndSendReply(new EmptyReply()));
+
+ Response response = responseHandler.awaitResponse();
+ assertTrue(response instanceof MbusResponse);
+ Reply reply = ((MbusResponse)response).getReply();
+ assertTrue(reply instanceof EmptyReply);
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatServerReceivesGivenMessage() {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+ assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler));
+
+ Message msg = driver.awaitMessage();
+ assertTrue(msg instanceof SimpleMessage);
+ assertEquals("foo", ((SimpleMessage)msg).getValue());
+
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ driver.sendReply(reply);
+
+ assertNotNull(responseHandler.awaitResponse());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatClientReceivesGivenReply() {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+ assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler));
+
+ Message msg = driver.awaitMessage(); // TODO: Timing sensitive
+ assertNotNull(msg);
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ driver.sendReply(reply);
+
+ Response response = responseHandler.awaitResponse();
+ assertTrue(response instanceof MbusResponse);
+ reply = ((MbusResponse)response).getReply();
+ assertTrue(reply instanceof SimpleReply);
+ assertEquals("bar", ((SimpleReply)reply).getValue());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatStateIsTransferredToResponse() {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+
+ Message msg = new SimpleMessage("foo");
+ Object pushedCtx = new Object();
+ msg.setContext(pushedCtx);
+ ReplyHandler pushedHandler = new MyReplyHandler();
+ msg.pushHandler(pushedHandler);
+ Object currentCtx = new Object();
+ msg.setContext(currentCtx);
+ msg.getTrace().setLevel(6);
+ assertTrue(driver.sendMessage(msg, responseHandler));
+ assertTrue(driver.awaitMessageAndSendReply(new EmptyReply()));
+
+ Response response = responseHandler.awaitResponse();
+ assertTrue(response.getClass().getName(), response instanceof MbusResponse);
+ Reply reply = ((MbusResponse)response).getReply();
+ assertSame(currentCtx, reply.getContext());
+ assertEquals(6, reply.getTrace().getLevel());
+ assertSame(pushedHandler, reply.popHandler());
+ assertSame(pushedCtx, reply.getContext());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatStateIsTransferredToSyncMbusSendFailureResponse() {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ driver.sourceSession().close();
+
+ Message msg = new SimpleMessage("foo");
+ ReplyHandler pushedHandler = new MyReplyHandler();
+ Object pushedCtx = new Object();
+ msg.setContext(pushedCtx);
+ msg.pushHandler(pushedHandler);
+ Object currentCtx = new Object();
+ msg.setContext(currentCtx);
+ msg.getTrace().setLevel(6);
+
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+ driver.sendMessage(msg, responseHandler);
+
+ Response response = responseHandler.awaitResponse();
+ assertNotNull(response);
+ assertTrue(response.getClass().getName(), response instanceof MbusResponse);
+ Reply reply = ((MbusResponse)response).getReply();
+ assertSame(currentCtx, reply.getContext());
+ assertEquals(6, reply.getTrace().getLevel());
+ assertSame(pushedHandler, reply.popHandler());
+ assertSame(pushedCtx, reply.getContext());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatStateIsTransferredToTimeoutResponse() throws InterruptedException {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+
+ Message msg = new SimpleMessage("foo");
+ ReplyHandler pushedHandler = new MyReplyHandler();
+ Object pushedCtx = new Object();
+ msg.setContext(pushedCtx);
+ msg.pushHandler(pushedHandler);
+ Object currentCtx = new Object();
+ msg.setContext(currentCtx);
+ msg.getTrace().setLevel(6);
+
+ Request request = driver.newClientRequest(msg);
+ request.setTimeout(1, TimeUnit.MILLISECONDS);
+ assertTrue(driver.sendRequest(request, responseHandler));
+ request.release();
+
+ Response response = responseHandler.awaitResponse();
+ assertNotNull(response);
+ assertTrue(response.getClass().getName(), response instanceof MbusResponse);
+ Reply reply = ((MbusResponse)response).getReply();
+ assertSame(currentCtx, reply.getContext());
+ assertEquals(6, reply.getTrace().getLevel());
+ assertSame(pushedHandler, reply.popHandler());
+ assertSame(pushedCtx, reply.getContext());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatSyncMbusSendFailureRespondsWithError() {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ driver.sourceSession().close();
+
+ MyResponseHandler responseHandler = MyResponseHandler.newInstance();
+ driver.sendMessage(new SimpleMessage("foo"), responseHandler);
+ Response response = responseHandler.awaitResponse();
+ assertNotNull(response);
+ assertTrue(response.getClass().getName(), response instanceof MbusResponse);
+ Reply reply = ((MbusResponse)response).getReply();
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.SEND_QUEUE_CLOSED, reply.getError(0).getCode());
+ assertTrue(driver.close());
+ }
+
+ private static class MyResponseHandler implements ResponseHandler {
+
+ final MyResponseContent content;
+ Response response;
+
+ MyResponseHandler(MyResponseContent content) {
+ this.content = content;
+ }
+
+ Response awaitResponse() {
+ try {
+ content.closeLatch.await(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ if (response instanceof MbusResponse) {
+ //System.out.println(((MbusResponse)response).getReply().getTrace());
+ }
+ return response;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ this.response = response;
+ return content;
+ }
+
+ static MyResponseHandler newInstance() {
+ return new MyResponseHandler(new MyResponseContent());
+ }
+ }
+
+ private static class MyResponseContent implements ContentChannel {
+
+ final CountDownLatch writeLatch = new CountDownLatch(1);
+ final CountDownLatch closeLatch = new CountDownLatch(1);
+
+ @Override
+ public void write(ByteBuffer buf, CompletionHandler handler) {
+ if (handler != null) {
+ handler.completed();
+ }
+ writeLatch.countDown();
+ }
+
+ @Override
+ public void close(CompletionHandler handler) {
+ if (handler != null) {
+ handler.completed();
+ }
+ closeLatch.countDown();
+ }
+ }
+
+ private static class MySession implements ClientSession {
+
+ int refCount = 1;
+
+ @Override
+ public Result sendMessage(Message msg) {
+ return null;
+ }
+
+ @Override
+ public ResourceReference refer() {
+ ++refCount;
+ return new ResourceReference() {
+ @Override
+ public void close() {
+ --refCount;
+ }
+ };
+ }
+
+ @Override
+ public void release() {
+ --refCount;
+ }
+ }
+
+ private static class MyReplyHandler implements ReplyHandler {
+
+ @Override
+ public void handleReply(Reply reply) {
+
+ }
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java
new file mode 100644
index 00000000000..316ad18bae9
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java
@@ -0,0 +1,121 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.RequestDispatch;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.test.SimpleMessage;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MbusRequestHandlerTestCase {
+
+ @Test
+ public void requireThatNonMbusRequestThrows() throws Exception {
+ final TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE);
+ try {
+ new RequestDispatch() {
+
+ @Override
+ protected Request newRequest() {
+ return new Request(driver, URI.create("mbus://localhost/"));
+ }
+ }.connect();
+ fail();
+ } catch (UnsupportedOperationException e) {
+ assertEquals("Expected MbusRequest, got com.yahoo.jdisc.Request.", e.getMessage());
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatHandlerCanRespondInSameThread() throws Exception {
+ TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE);
+
+ Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS);
+ assertTrue(response instanceof MbusResponse);
+ assertEquals(Response.Status.OK, response.getStatus());
+ Reply reply = ((MbusResponse)response).getReply();
+ assertTrue(reply instanceof EmptyReply);
+ assertFalse(reply.hasErrors());
+
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatHandlerCanRespondInOtherThread() throws Exception {
+ TestDriver driver = newTestDriver(ThreadedReplier.INSTANCE);
+
+ Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS);
+ assertTrue(response instanceof MbusResponse);
+ assertEquals(Response.Status.OK, response.getStatus());
+ Reply reply = ((MbusResponse)response).getReply();
+ assertTrue(reply instanceof EmptyReply);
+ assertFalse(reply.hasErrors());
+
+ assertTrue(driver.close());
+ }
+
+ private static TestDriver newTestDriver(MbusRequestHandler handler) {
+ TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ ContainerBuilder builder = driver.newContainerBuilder();
+ builder.serverBindings().bind("mbus://*/*", handler);
+ driver.activateContainer(builder);
+ return driver;
+ }
+
+ private static ListenableFuture<Response> dispatchMessage(final TestDriver driver, final Message msg) {
+ return new RequestDispatch() {
+
+ @Override
+ protected Request newRequest() {
+ return new MbusRequest(driver, URI.create("mbus://localhost/"), msg);
+ }
+ }.dispatch();
+ }
+
+ private static class SameThreadReplier extends MbusRequestHandler {
+
+ final static SameThreadReplier INSTANCE = new SameThreadReplier();
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ }
+
+ private static class ThreadedReplier extends MbusRequestHandler {
+
+ final static ThreadedReplier INSTANCE = new ThreadedReplier();
+
+ @Override
+ public void handleMessage(final Message msg) {
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ SameThreadReplier.INSTANCE.handleMessage(msg);
+ }
+ });
+ }
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java
new file mode 100644
index 00000000000..c68ab4e6742
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java
@@ -0,0 +1,73 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.text.Utf8String;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MbusRequestTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ driver.activateContainer(driver.newContainerBuilder());
+
+ MyMessage msg = new MyMessage();
+ MbusRequest request = new MbusRequest(driver, URI.create("mbus://host/path"), msg);
+ assertSame(msg, request.getMessage());
+ request.release();
+ driver.close();
+ }
+
+ @Test
+ public void requireThatMessageCanNotBeNullInRootRequest() {
+ TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ driver.activateContainer(driver.newContainerBuilder());
+ try {
+ new MbusRequest(driver, URI.create("mbus://host/path"), null);
+ fail();
+ } catch (NullPointerException e) {
+ // expected
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatMessageCanNotBeNullInChildRequest() {
+ TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ driver.activateContainer(driver.newContainerBuilder());
+ MbusRequest parent = new MbusRequest(driver, URI.create("mbus://host/path"), new SimpleMessage("foo"));
+ try {
+ new MbusRequest(parent, URI.create("mbus://host/path"), null);
+ fail();
+ } catch (NullPointerException e) {
+ // expected
+ }
+ parent.release();
+ assertTrue(driver.close());
+ }
+
+ private class MyMessage extends Message {
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java
new file mode 100644
index 00000000000..eb4cb949770
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java
@@ -0,0 +1,46 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Response;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.text.Utf8String;
+import org.junit.Test;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MbusResponseTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ MyReply reply = new MyReply();
+ MbusResponse response = new MbusResponse(Response.Status.OK, reply);
+ assertSame(reply, response.getReply());
+ }
+
+ @Test
+ public void requireThatReplyCanNotBeNull() {
+ try {
+ new MbusResponse(Response.Status.OK, null);
+ fail();
+ } catch (NullPointerException e) {
+
+ }
+ }
+
+ private class MyReply extends Reply {
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
new file mode 100644
index 00000000000..bf89f3869ed
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
@@ -0,0 +1,694 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.yahoo.jdisc.test.ServerProviderConformanceTest;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.network.local.LocalNetwork;
+import com.yahoo.messagebus.network.local.LocalWire;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.ServerSession;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.hamcrest.Matcher;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static com.yahoo.messagebus.ErrorCode.APP_FATAL_ERROR;
+import static com.yahoo.messagebus.ErrorCode.SESSION_BUSY;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MbusServerConformanceTest extends ServerProviderConformanceTest {
+
+ /* Many of the "success" expectations here (may) seem odd. But this is the current behavior of the
+ * messagebus server. We should probably look into whether the behavior is correct in all cases.
+ */
+
+ @Override
+ @Test
+ public void testContainerNotReadyException() throws Throwable {
+ new TestRunner().setRequestTimeout(100, TimeUnit.MILLISECONDS)
+ .expectError(is(SESSION_BUSY))
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testBindingSetNotFoundException() throws Throwable {
+ new TestRunner().expectError(is(APP_FATAL_ERROR))
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testNoBindingSetSelectedException() throws Throwable {
+ new TestRunner().expectError(is(APP_FATAL_ERROR))
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testBindingNotFoundException() throws Throwable {
+ new TestRunner().expectError(is(APP_FATAL_ERROR))
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestHandlerWithSyncCloseResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestHandlerWithSyncWriteResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestHandlerWithSyncHandleResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestHandlerWithAsyncHandleResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestException() throws Throwable {
+ new TestRunner().expectError(is(APP_FATAL_ERROR))
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestExceptionWithSyncCloseResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestExceptionWithSyncWriteResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestNondeterministicExceptionWithSyncHandleResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestExceptionAfterResponseWriteWithSyncHandleResponse() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestNondeterministicExceptionWithAsyncHandleResponse() throws Throwable {
+ new TestRunner().executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse() throws Throwable {
+ new TestRunner().expectError(is(APP_FATAL_ERROR))
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithSyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithAsyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithNondeterministicSyncFailure() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithSyncFailureBeforeResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithSyncFailureAfterResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithNondeterministicAsyncFailure() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithAsyncFailureBeforeResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithAsyncFailureAfterResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteNondeterministicException() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionBeforeResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionAfterResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionAfterResponseCloseNoContent() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteNondeterministicExceptionWithSyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionWithNondeterministicSyncFailure() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionWithNondeterministicAsyncFailure() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseWithSyncCompletion() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseWithAsyncCompletion() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseWithNondeterministicSyncFailure() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseWithSyncFailureBeforeResponseWrite() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentCloseWithSyncFailureAfterResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseWithNondeterministicAsyncFailure() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseWithAsyncFailureBeforeResponseWrite() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentCloseWithAsyncFailureAfterResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseNondeterministicException() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionBeforeResponseWrite() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentCloseExceptionAfterResponseWrite() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionAfterResponseCloseNoContent() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseNondeterministicExceptionWithSyncCompletion() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseNondeterministicExceptionWithSyncFailure() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseNondeterministicExceptionWithAsyncFailure() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ @Ignore // N/A: The messagebus protocol does not have content.
+ public void testResponseWriteCompletionException() throws Throwable {
+ }
+
+ @Override
+ @Test
+ public void testResponseCloseCompletionException() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ @Override
+ @Test
+ public void testResponseCloseCompletionExceptionNoContent() throws Throwable {
+ new TestRunner().expectSuccess()
+ .executeAndClose();
+ }
+
+ private class TestRunner implements Adapter<MbusServer, MyClient, Reply> {
+
+ final LocalWire wire = new LocalWire();
+ final SharedMessageBus mbus;
+ final ServerSession session;
+ Matcher<Integer> expectedError = null;
+ boolean successExpected = false;
+ long timeoutMillis = TimeUnit.SECONDS.toMillis(60);
+
+ TestRunner() {
+ this(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new DestinationSessionParams());
+ }
+
+ TestRunner(MessageBusParams mbusParams, DestinationSessionParams sessionParams) {
+ this.mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(wire), mbusParams));
+ this.session = mbus.newDestinationSession(sessionParams);
+ }
+
+ TestRunner setRequestTimeout(long timeout, TimeUnit unit) {
+ timeoutMillis = unit.toMillis(timeout);
+ return this;
+ }
+
+ TestRunner expectError(Matcher<Integer> matcher) {
+ assertThat(successExpected, is(false));
+ expectedError = matcher;
+ return this;
+ }
+
+ TestRunner expectSuccess() {
+ assertThat(expectedError, is(nullValue()));
+ successExpected = true;
+ return this;
+ }
+
+ @Override
+ public Module newConfigModule() {
+ return new AbstractModule() {
+
+ @Override
+ protected void configure() {
+ bind(ServerSession.class).toInstance(session);
+ }
+ };
+ }
+
+ @Override
+ public Class<MbusServer> getServerProviderClass() {
+ return MbusServer.class;
+ }
+
+ @Override
+ public MyClient newClient(MbusServer server) throws Throwable {
+ return new MyClient(wire, server.connectionSpec());
+ }
+
+ @Override
+ public Reply executeRequest(MyClient client, boolean withRequestContent) throws Throwable {
+ // This protocol doesn't have the concept of "request content", so if we are asked to send any, it's a bug.
+ assertThat(withRequestContent, is(false));
+
+ final SimpleMessage msg = new SimpleMessage("foo");
+ msg.getTrace().setLevel(9);
+ msg.setRoute(client.route);
+ msg.setTimeRemaining(timeoutMillis);
+ assertThat("client.session.send(msg).isAccepted()",
+ client.session.send(msg).isAccepted(), is(true));
+
+ final Reply reply = client.replies.poll(60, TimeUnit.SECONDS);
+ assertThat("reply != null", reply, notNullValue());
+ return reply;
+ }
+
+ @Override
+ public Iterable<ByteBuffer> newResponseContent() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void validateResponse(Reply reply) throws Throwable {
+ final String trace = String.valueOf(reply.getTrace());
+ if (expectedError != null) {
+ assertThat(reply.hasErrors(), is(true));
+ final int error = reply.getError(0).getCode();
+ assertThat(trace, error, expectedError);
+ }
+ if (successExpected) {
+ assertThat(trace, reply.hasErrors(), is(false));
+ }
+ }
+
+ void executeAndClose() throws Throwable {
+ runTest(this);
+ session.release();
+ mbus.release();
+ }
+ }
+
+ public static class MyClient implements Closeable, ReplyHandler {
+
+ final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>();
+ final MessageBus mbus;
+ final Route route;
+ final SourceSession session;
+
+ MyClient(LocalWire wire, String connectionSpec) {
+ this.mbus = new MessageBus(new LocalNetwork(wire),
+ new MessageBusParams().addProtocol(new SimpleProtocol()));
+ this.session = mbus.createSourceSession(new SourceSessionParams().setReplyHandler(this));
+ this.route = Route.parse(connectionSpec);
+ }
+
+ @Override
+ public void close() throws IOException {
+ session.destroy();
+ mbus.destroy();
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ replies.addLast(reply);
+ }
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
new file mode 100644
index 00000000000..9d45d2e7abf
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
@@ -0,0 +1,374 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.google.inject.AbstractModule;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.application.BindingSetSelector;
+import com.yahoo.jdisc.handler.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.test.ServerTestDriver;
+import com.yahoo.messagebus.shared.ServerSession;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MbusServerTestCase {
+
+ @Test
+ public void requireThatServerRetainsSession() {
+ MySession session = new MySession();
+ assertEquals(1, session.refCount);
+ MbusServer server = new MbusServer(null, session);
+ assertEquals(2, session.refCount);
+ session.release();
+ assertEquals(1, session.refCount);
+ server.destroy();
+ assertEquals(0, session.refCount);
+ }
+
+ @Test
+ public void requireThatNoBindingSetSelectedExceptionIsCaught() {
+ ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector(null));
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+ assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatBindingSetNotFoundExceptionIsCaught() {
+ ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector("foo"));
+ assertTrue(driver.sendMessage(new SimpleMessage("bar")));
+ assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatContainerNotReadyExceptionIsCaught() {
+ ServerTestDriver driver = ServerTestDriver.newInactiveInstance(true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+ assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatBindingNotFoundExceptionIsCaught() {
+ ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+ assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatRequestDeniedExceptionIsCaught() {
+ ServerTestDriver driver = ServerTestDriver.newInstance(MyRequestHandler.newRequestDenied(), true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+ assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatRequestResponseWorks() {
+ MyRequestHandler requestHandler = MyRequestHandler.newInstance();
+ ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+
+ assertNotNull(requestHandler.awaitRequest());
+ assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK)));
+
+ assertNotNull(driver.awaitSuccess());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatRequestIsMbus() {
+ MyRequestHandler requestHandler = MyRequestHandler.newInstance();
+ ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+
+ Request request = requestHandler.awaitRequest();
+ assertTrue(request instanceof MbusRequest);
+ Message msg = ((MbusRequest)request).getMessage();
+ assertTrue(msg instanceof SimpleMessage);
+ assertEquals("foo", ((SimpleMessage)msg).getValue());
+ assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK)));
+
+ assertNotNull(driver.awaitSuccess());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatReplyInsideMbusResponseIsUsed() {
+ MyRequestHandler requestHandler = MyRequestHandler.newInstance();
+ ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+
+ assertNotNull(requestHandler.awaitRequest());
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(((MbusRequest)requestHandler.request).getMessage());
+ assertTrue(requestHandler.sendResponse(new MbusResponse(Response.Status.OK, reply)));
+
+ reply = driver.awaitSuccess();
+ assertTrue(reply instanceof SimpleReply);
+ assertEquals("bar", ((SimpleReply)reply).getValue());
+
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatNonMbusResponseCausesEmptyReply() {
+ MyRequestHandler requestHandler = MyRequestHandler.newInstance();
+ ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+
+ assertNotNull(requestHandler.awaitRequest());
+ assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK)));
+
+ assertNotNull(driver.awaitSuccess());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatMbusRequestContentCallsCompletion() throws InterruptedException {
+ MyRequestHandler requestHandler = MyRequestHandler.newInstance();
+ ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+
+ assertNotNull(requestHandler.awaitRequest());
+ ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK));
+ assertNotNull(content);
+ MyCompletion completion = new MyCompletion();
+ content.close(completion);
+ assertTrue(completion.completedLatch.await(60, TimeUnit.SECONDS));
+
+ assertNotNull(driver.awaitSuccess());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatResponseContentDoesNotSupportWrite() {
+ MyRequestHandler requestHandler = MyRequestHandler.newInstance();
+ ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+
+ assertNotNull(requestHandler.awaitRequest());
+ ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK));
+ assertNotNull(content);
+ try {
+ content.write(ByteBuffer.allocate(69), null);
+ fail();
+ } catch (UnsupportedOperationException e) {
+
+ }
+ content.close(null);
+
+ assertNotNull(driver.awaitSuccess());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatResponseErrorCodeDoesNotDuplicateReplyError() {
+ assertError(Collections.<Integer>emptyList(),
+ Response.Status.OK);
+ assertError(Arrays.asList(ErrorCode.APP_FATAL_ERROR),
+ Response.Status.BAD_REQUEST);
+ assertError(Arrays.asList(ErrorCode.FATAL_ERROR),
+ Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR);
+ assertError(Arrays.asList(ErrorCode.TRANSIENT_ERROR, ErrorCode.APP_FATAL_ERROR),
+ Response.Status.BAD_REQUEST, ErrorCode.TRANSIENT_ERROR);
+ assertError(Arrays.asList(ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR),
+ Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR);
+ }
+
+ private static void assertError(List<Integer> expectedErrors, int responseStatus, int... responseErrors) {
+ MyRequestHandler requestHandler = MyRequestHandler.newInstance();
+ ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
+ assertTrue(driver.sendMessage(new SimpleMessage("foo")));
+
+ assertNotNull(requestHandler.awaitRequest());
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(((MbusRequest)requestHandler.request).getMessage());
+ for (int err : responseErrors) {
+ reply.addError(new Error(err, "err"));
+ }
+ assertTrue(requestHandler.sendResponse(new MbusResponse(responseStatus, reply)));
+
+ assertNotNull(reply = driver.awaitReply());
+ List<Integer> actual = new LinkedList<>();
+ for (int i = 0; i < reply.getNumErrors(); ++i) {
+ actual.add(reply.getError(i).getCode());
+ }
+ assertEquals(expectedErrors, actual);
+ assertTrue(driver.close());
+ }
+
+ private static class MySelector extends AbstractModule implements BindingSetSelector {
+
+ final String bindingSet;
+
+ MySelector(String bindingSet) {
+ this.bindingSet = bindingSet;
+ }
+
+ @Override
+ protected void configure() {
+ bind(BindingSetSelector.class).toInstance(this);
+ }
+
+ @Override
+ public String select(URI uri) {
+ return bindingSet;
+ }
+ }
+
+ private static class MyRequestHandler extends AbstractRequestHandler {
+
+ final MyRequestContent content;
+ Request request;
+ ResponseHandler responseHandler;
+
+ MyRequestHandler(MyRequestContent content) {
+ this.content = content;
+ }
+
+ @Override
+ public ContentChannel handleRequest(Request request, ResponseHandler responseHandler) {
+ this.request = request;
+ this.responseHandler = responseHandler;
+ if (content == null) {
+ throw new RequestDeniedException(request);
+ }
+ return content;
+ }
+
+ Request awaitRequest() {
+ try {
+ if (!content.closeLatch.await(60, TimeUnit.SECONDS)) {
+ return null;
+ }
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ if (request instanceof MbusRequest) {
+ ((MbusRequest)request).getMessage().getTrace().trace(0, "Request received by DISC.");
+ }
+ return request;
+ }
+
+ boolean sendResponse(Response response) {
+ ContentChannel content = responseHandler.handleResponse(response);
+ if (content == null) {
+ return false;
+ }
+ content.close(null);
+ return true;
+ }
+
+ static MyRequestHandler newInstance() {
+ return new MyRequestHandler(new MyRequestContent());
+ }
+
+ static MyRequestHandler newRequestDenied() {
+ return new MyRequestHandler(null);
+ }
+ }
+
+ private static class MyRequestContent implements ContentChannel {
+
+ final CountDownLatch writeLatch = new CountDownLatch(1);
+ final CountDownLatch closeLatch = new CountDownLatch(1);
+
+ @Override
+ public void write(ByteBuffer buf, CompletionHandler handler) {
+ if (handler != null) {
+ handler.completed();
+ }
+ writeLatch.countDown();
+ }
+
+ @Override
+ public void close(CompletionHandler handler) {
+ if (handler != null) {
+ handler.completed();
+ }
+ closeLatch.countDown();
+ }
+ }
+
+ private static class MyCompletion implements CompletionHandler {
+
+ final CountDownLatch completedLatch = new CountDownLatch(1);
+
+ @Override
+ public void completed() {
+ completedLatch.countDown();
+ }
+
+ @Override
+ public void failed(Throwable t) {
+
+ }
+ }
+
+ private static class MySession implements ServerSession {
+
+ int refCount = 1;
+
+ @Override
+ public void sendReply(Reply reply) {
+
+ }
+
+ @Override
+ public MessageHandler getMessageHandler() {
+ return null;
+ }
+
+ @Override
+ public void setMessageHandler(MessageHandler msgHandler) {
+
+ }
+
+ @Override
+ public String connectionSpec() {
+ return null;
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public ResourceReference refer() {
+ ++refCount;
+ return new ResourceReference() {
+ @Override
+ public void close() {
+ --refCount;
+ }
+ };
+ }
+
+ @Override
+ public void release() {
+ --refCount;
+ }
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java
new file mode 100644
index 00000000000..a7ee355094f
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java
@@ -0,0 +1,137 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.service.CurrentContainer;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.network.local.LocalNetwork;
+import com.yahoo.messagebus.network.local.LocalWire;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.SharedDestinationSession;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ServerThreadingTestCase {
+
+ private static final int NUM_THREADS = 32;
+ private static final int NUM_REQUESTS = 1000;
+
+ @Test
+ public void requireThatServerIsThreadSafe() throws Exception {
+ final LocalWire wire = new LocalWire();
+ final Client client = new Client(wire);
+ final Server server = new Server(wire);
+
+ for (int i = 0; i < NUM_REQUESTS; ++i) {
+ final Message msg = new SimpleMessage("foo");
+ msg.setRoute(Route.parse(server.delegate.connectionSpec()));
+ msg.pushHandler(client);
+ assertThat(client.session.send(msg).isAccepted(), is(true));
+ }
+ for (int i = 0; i < NUM_REQUESTS; ++i) {
+ final Reply reply = client.replies.poll(600, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(EmptyReply.class));
+ assertThat(reply.hasErrors(), is(false));
+ }
+
+ assertThat(client.close(), is(true));
+ assertThat(server.close(), is(true));
+ }
+
+ private static class Client implements ReplyHandler {
+
+ final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>();
+ final MessageBus mbus;
+ final SourceSession session;
+
+ Client(final LocalWire wire) {
+ mbus = new MessageBus(
+ new LocalNetwork(wire),
+ new MessageBusParams().addProtocol(new SimpleProtocol()));
+ session = mbus.createSourceSession(
+ new SourceSessionParams()
+ .setReplyHandler(this)
+ .setThrottlePolicy(null));
+ }
+
+ @Override
+ public void handleReply(final Reply reply) {
+ replies.addLast(reply);
+ }
+
+ boolean close() {
+ return session.destroy() && mbus.destroy();
+ }
+ }
+
+ private static class Server extends MbusRequestHandler {
+
+ final Executor executor = Executors.newFixedThreadPool(NUM_THREADS);
+ final MbusServer delegate;
+ final TestDriver driver;
+
+ Server(final LocalWire wire) {
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ delegate = newMbusServer(driver, wire);
+
+ final ContainerBuilder builder = driver.newContainerBuilder();
+ builder.serverBindings().bind("mbus://*/*", this);
+ driver.activateContainer(builder);
+ delegate.start();
+ }
+
+ @Override
+ public void handleMessage(final Message msg) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ final Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ });
+ }
+
+ boolean close() {
+ delegate.release();
+ return driver.close();
+ }
+ }
+
+ private static MbusServer newMbusServer(final CurrentContainer container, final LocalWire wire) {
+ final SharedMessageBus mbus = new SharedMessageBus(new MessageBus(
+ new LocalNetwork(wire),
+ new MessageBusParams().addProtocol(new SimpleProtocol())));
+ final SharedDestinationSession session = mbus.newDestinationSession(
+ new DestinationSessionParams());
+ final MbusServer server = new MbusServer(container, session);
+ session.release();
+ mbus.release();
+ return server;
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
new file mode 100644
index 00000000000..ef290a070cb
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
@@ -0,0 +1,32 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ClientTestDriverTestCase {
+
+ @Test
+ public void requireThatFactoryMethodsWork() throws ListenFailedException {
+ ClientTestDriver driver = ClientTestDriver.newInstance();
+ assertNotNull(driver);
+ assertTrue(driver.close());
+
+ driver = ClientTestDriver.newInstanceWithProtocol(new SimpleProtocol());
+ assertNotNull(driver);
+ assertTrue(driver.close());
+
+ Slobrok slobrok = new Slobrok();
+ driver = ClientTestDriver.newInstanceWithExternSlobrok(slobrok.configId());
+ assertNotNull(driver);
+ assertTrue(driver.close());
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
new file mode 100644
index 00000000000..f6ae2335d12
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
@@ -0,0 +1,34 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jdisc.test.NonWorkingRequestHandler;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ServerTestDriverTestCase {
+
+ @Test
+ public void requireThatFactoryMethodsWork() throws ListenFailedException {
+ ServerTestDriver driver = ServerTestDriver.newInstance(new NonWorkingRequestHandler(), false);
+ assertNotNull(driver);
+ assertTrue(driver.close());
+
+ driver = ServerTestDriver.newInstanceWithProtocol(new SimpleProtocol(), new NonWorkingRequestHandler(), false);
+ assertNotNull(driver);
+ assertTrue(driver.close());
+
+ Slobrok slobrok = new Slobrok();
+ driver = ServerTestDriver.newInstanceWithExternSlobrok(slobrok.configId(), new NonWorkingRequestHandler(), false);
+ assertNotNull(driver);
+ assertTrue(driver.close());
+ }
+
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
new file mode 100644
index 00000000000..78e79da4b9f
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
@@ -0,0 +1,134 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.test.MessageQueue;
+import com.yahoo.messagebus.jdisc.test.RemoteClient;
+import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedDestinationSessionTestCase {
+
+ @Test
+ public void requireThatMessageHandlerCanBeAccessed() {
+ SharedDestinationSession session = newDestinationSession();
+ assertNull(session.getMessageHandler());
+
+ MessageQueue handler = new MessageQueue();
+ session.setMessageHandler(handler);
+ assertSame(handler, session.getMessageHandler());
+ }
+
+ @Test
+ public void requireThatMessageHandlerCanOnlyBeSetOnce() {
+ SharedDestinationSession session = newDestinationSession();
+ session.setMessageHandler(new MessageQueue());
+ try {
+ session.setMessageHandler(new MessageQueue());
+ fail();
+ } catch (IllegalStateException e) {
+ assertEquals("Message handler already registered.", e.getMessage());
+ }
+ session.release();
+ }
+
+ @Test
+ public void requireThatMessageHandlerIsCalled() throws InterruptedException {
+ SharedDestinationSession session = newDestinationSession();
+ MessageQueue queue = new MessageQueue();
+ session.setMessageHandler(queue);
+ session.handleMessage(new SimpleMessage("foo"));
+ assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS));
+ session.release();
+ }
+
+ @Test
+ public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException {
+ SharedDestinationSession session = newDestinationSession();
+ Message msg = new SimpleMessage("foo");
+ ReplyQueue queue = new ReplyQueue();
+ msg.pushHandler(queue);
+ session.handleMessage(msg);
+ Reply reply = queue.awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode());
+ session.release();
+ }
+
+ @Test
+ public void requireThatSessionIsClosedOnDestroy() {
+ SharedDestinationSession session = newDestinationSession();
+ session.release();
+ assertFalse("DestinationSession not destroyed by release().", session.session().destroy());
+ }
+
+ @Test
+ public void requireThatMbusIsReleasedOnDestroy() {
+ Slobrok slobrok = null;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ fail();
+ }
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams);
+ SharedDestinationSession session = mbus.newDestinationSession(new DestinationSessionParams());
+ mbus.release();
+ session.release();
+ assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy());
+ }
+
+ @Test
+ public void requireThatSessionCanSendReply() throws InterruptedException {
+ RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
+ MessageQueue queue = new MessageQueue();
+ DestinationSessionParams params = new DestinationSessionParams().setMessageHandler(queue);
+ SharedDestinationSession session = newDestinationSession(client.slobrokId(), params);
+ Route route = Route.parse(session.connectionSpec());
+
+ assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
+ Message msg = queue.awaitMessage(60, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ session.sendReply(reply);
+ assertNotNull(client.awaitReply(60, TimeUnit.SECONDS));
+
+ session.release();
+ client.close();
+ }
+
+ private static SharedDestinationSession newDestinationSession() {
+ Slobrok slobrok = null;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ fail();
+ }
+ return newDestinationSession(slobrok.configId(), new DestinationSessionParams());
+ }
+
+ private static SharedDestinationSession newDestinationSession(String slobrokId, DestinationSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ SharedDestinationSession session = mbus.newDestinationSession(params);
+ mbus.release();
+ return session;
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
new file mode 100644
index 00000000000..87958415149
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
@@ -0,0 +1,174 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.test.MessageQueue;
+import com.yahoo.messagebus.jdisc.test.RemoteClient;
+import com.yahoo.messagebus.jdisc.test.RemoteServer;
+import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.network.local.LocalNetwork;
+import com.yahoo.messagebus.network.local.LocalWire;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedIntermediateSessionTestCase {
+
+ @Test
+ public void requireThatMessageHandlerCanBeAccessed() {
+ SharedIntermediateSession session = newIntermediateSession(false);
+ assertNull(session.getMessageHandler());
+
+ MessageQueue handler = new MessageQueue();
+ session.setMessageHandler(handler);
+ assertSame(handler, session.getMessageHandler());
+ }
+
+ @Test
+ public void requireThatMessageHandlerCanOnlyBeSetOnce() {
+ SharedIntermediateSession session = newIntermediateSession(false);
+ session.setMessageHandler(new MessageQueue());
+ try {
+ session.setMessageHandler(new MessageQueue());
+ fail();
+ } catch (IllegalStateException e) {
+ assertEquals("Message handler already registered.", e.getMessage());
+ }
+ session.release();
+ }
+
+ @Test
+ public void requireThatMessageHandlerIsCalled() throws InterruptedException {
+ SharedIntermediateSession session = newIntermediateSession(false);
+ MessageQueue queue = new MessageQueue();
+ session.setMessageHandler(queue);
+ session.handleMessage(new SimpleMessage("foo"));
+ assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS));
+ session.release();
+ }
+
+ @Test
+ public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException {
+ SharedIntermediateSession session = newIntermediateSession(false);
+ Message msg = new SimpleMessage("foo");
+ ReplyQueue queue = new ReplyQueue();
+ msg.pushHandler(queue);
+ session.handleMessage(msg);
+ Reply reply = queue.awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode());
+ session.release();
+ }
+
+ @Test
+ public void requireThatReplyHandlerCanNotBeSet() throws ListenFailedException {
+ Slobrok slobrok = new Slobrok();
+ try {
+ newIntermediateSession(slobrok.configId(),
+ new IntermediateSessionParams().setReplyHandler(new ReplyQueue()),
+ false);
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertEquals("Reply handler must be null.", e.getMessage());
+ }
+ }
+
+ @Test
+ public void requireThatSessionIsClosedOnDestroy() {
+ SharedIntermediateSession session = newIntermediateSession(false);
+ session.release();
+ assertFalse("IntermediateSession not destroyed by release().", session.session().destroy());
+ }
+
+ @Test
+ public void requireThatMbusIsReleasedOnDestroy() {
+ try {
+ new Slobrok();
+ } catch (ListenFailedException e) {
+ fail();
+ }
+ SharedMessageBus mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), new MessageBusParams()));
+
+ SharedIntermediateSession session = mbus.newIntermediateSession(new IntermediateSessionParams());
+ mbus.release();
+ session.release();
+ assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy());
+ }
+
+ @Test
+ public void requireThatSessionCanSendMessage() throws InterruptedException {
+ RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
+ SharedIntermediateSession session = newIntermediateSession(server.slobrokId(),
+ new IntermediateSessionParams(),
+ true);
+ ReplyQueue queue = new ReplyQueue();
+ Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
+ msg.setTimeReceivedNow();
+ msg.setTimeRemaining(60000);
+ msg.getTrace().setLevel(9);
+ msg.pushHandler(queue);
+ assertTrue(session.sendMessage(msg).isAccepted());
+ assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS));
+ server.ackMessage(msg);
+ assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS));
+
+ session.release();
+ server.close();
+ }
+
+ @Test
+ public void requireThatSessionCanSendReply() throws InterruptedException {
+ RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
+ MessageQueue queue = new MessageQueue();
+ IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue);
+ SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true);
+ Route route = Route.parse(session.connectionSpec());
+
+ assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
+ Message msg = queue.awaitMessage(60, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ session.sendReply(reply);
+ assertNotNull(client.awaitReply(60, TimeUnit.SECONDS));
+
+ session.release();
+ client.close();
+ }
+
+ private static SharedIntermediateSession newIntermediateSession(boolean network) {
+ Slobrok slobrok = null;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ fail();
+ }
+ return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network);
+ }
+
+ private static SharedIntermediateSession newIntermediateSession(String slobrokId,
+ IntermediateSessionParams params,
+ boolean network) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
+ SharedMessageBus mbus = network
+ ? SharedMessageBus.newInstance(mbusParams, netParams)
+ : new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), mbusParams));
+ SharedIntermediateSession session = mbus.newIntermediateSession(params);
+ mbus.release();
+ return session;
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java
new file mode 100644
index 00000000000..a54489a89e6
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java
@@ -0,0 +1,37 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedMessageBusTestCase {
+
+ @Test
+ public void requireThatMbusCanNotBeNull() {
+ try {
+ new SharedMessageBus(null);
+ fail();
+ } catch (NullPointerException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void requireThatMbusIsClosedOnDestroy() throws ListenFailedException {
+ Slobrok slobrok = new Slobrok();
+ SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(),
+ new RPCNetworkParams()
+ .setSlobrokConfigId(slobrok.configId()));
+ mbus.release();
+ assertFalse(mbus.messageBus().destroy());
+ }
+}
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
new file mode 100644
index 00000000000..1f0966fc961
--- /dev/null
+++ b/container-messagebus/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
@@ -0,0 +1,94 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.jdisc.test.RemoteServer;
+import com.yahoo.messagebus.jdisc.test.ReplyQueue;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SharedSourceSessionTestCase {
+
+ @Test
+ public void requireThatReplyHandlerCanNotBeSet() {
+ try {
+ newSourceSession(new SourceSessionParams().setReplyHandler(new ReplyQueue()));
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertEquals("Reply handler must be null.", e.getMessage());
+ }
+ }
+
+ @Test
+ public void requireThatSessionIsClosedOnDestroy() {
+ SharedSourceSession session = newSourceSession(new SourceSessionParams());
+ session.release();
+ assertFalse("SourceSession not destroyed by release().", session.session().destroy());
+ }
+
+ @Test
+ public void requireThatMbusIsReleasedOnDestroy() {
+ Slobrok slobrok = null;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ fail();
+ }
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams);
+ SharedSourceSession session = mbus.newSourceSession(new SourceSessionParams());
+ mbus.release();
+ session.release();
+ assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy());
+ }
+
+ @Test
+ public void requireThatSessionCanSendMessage() throws InterruptedException {
+ RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
+ SharedSourceSession session = newSourceSession(server.slobrokId(),
+ new SourceSessionParams());
+ ReplyQueue queue = new ReplyQueue();
+ Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
+ msg.pushHandler(queue);
+ assertTrue(session.sendMessage(msg).isAccepted());
+ assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS));
+ server.ackMessage(msg);
+ assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS));
+
+ session.release();
+ server.close();
+ }
+
+ private static SharedSourceSession newSourceSession(SourceSessionParams params) {
+ Slobrok slobrok = null;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ fail();
+ }
+ return newSourceSession(slobrok.configId(), params);
+ }
+
+ private static SharedSourceSession newSourceSession(String slobrokId, SourceSessionParams params) {
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ SharedSourceSession session = mbus.newSourceSession(params);
+ mbus.release();
+ return session;
+ }
+}