summaryrefslogtreecommitdiffstats
path: root/jdisc_messagebus_service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'jdisc_messagebus_service/src/main')
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java22
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java147
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java38
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java59
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java25
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java135
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java79
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java134
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java27
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java73
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java87
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java26
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java154
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/metrics/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java14
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java73
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java22
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java81
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java98
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java64
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java60
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java8
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java5
28 files changed, 1461 insertions, 0 deletions
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java
new file mode 100644
index 00000000000..8c55be9cd89
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.handler.CompletionHandler;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+enum IgnoredCompletionHandler implements CompletionHandler {
+
+ INSTANCE;
+
+ @Override
+ public void completed() {
+
+ }
+
+ @Override
+ public void failed(final Throwable t) {
+
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
new file mode 100644
index 00000000000..e1740433d83
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
@@ -0,0 +1,147 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.google.inject.Inject;
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.RequestDeniedException;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.service.ClientProvider;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.shared.ClientSession;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public final class MbusClient extends AbstractResource implements ClientProvider, ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(MbusClient.class.getName());
+ private final BlockingQueue<MbusRequest> queue = new LinkedBlockingQueue<>();
+ private final ClientSession session;
+ private final Thread thread = new Thread(new SenderTask(), "MbusClient");
+ private volatile boolean done = false;
+ private final ResourceReference sessionReference;
+
+ @Inject
+ public MbusClient(final ClientSession session) {
+ this.session = session;
+ this.sessionReference = session.refer();
+ }
+
+ @Override
+ public void start() {
+ thread.start();
+ }
+
+ @Override
+ public ContentChannel handleRequest(final Request request, final ResponseHandler handler) {
+ if (!(request instanceof MbusRequest)) {
+ throw new RequestDeniedException(request);
+ }
+ final Message msg = ((MbusRequest)request).getMessage();
+ msg.getTrace().trace(6, "Request received by MbusClient.");
+ msg.pushHandler(null); // save user context
+ final Long timeout = request.timeRemaining(TimeUnit.MILLISECONDS);
+ if (timeout != null) {
+ msg.setTimeReceivedNow();
+ msg.setTimeRemaining(Math.max(1, timeout)); // negative or zero timeout has semantics
+ }
+ msg.setContext(handler);
+ msg.pushHandler(this);
+ queue.add((MbusRequest)request);
+ return null;
+ }
+
+ @Override
+ public void handleTimeout(final Request request, final ResponseHandler handler) {
+ // ignore, mbus has guaranteed reply
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(LogLevel.DEBUG, "Destroying message bus client.");
+ sessionReference.close();
+ done = true;
+ }
+
+ @Override
+ public void handleReply(final Reply reply) {
+ reply.getTrace().trace(6, "Reply received by MbusClient.");
+ final ResponseHandler handler = (ResponseHandler)reply.getContext();
+ reply.popHandler(); // restore user context
+ try {
+ handler.handleResponse(new MbusResponse(StatusCodes.fromMbusReply(reply), reply))
+ .close(IgnoredCompletionHandler.INSTANCE);
+ } catch (final Exception e) {
+ log.log(LogLevel.WARNING, "Ignoring exception thrown by ResponseHandler.", e);
+ }
+ }
+
+ private void sendBlocking(final MbusRequest request) {
+ while (!sendMessage(request)) {
+ try {
+ Thread.sleep(100);
+ } catch (final InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ private boolean sendMessage(final MbusRequest request) {
+ Error error;
+ final Long millis = request.timeRemaining(TimeUnit.MILLISECONDS);
+ if (millis != null && millis <= 0) {
+ error = new Error(ErrorCode.TIMEOUT, request.getTimeout(TimeUnit.MILLISECONDS) + " millis");
+ } else if (request.isCancelled()) {
+ error = new Error(ErrorCode.APP_FATAL_ERROR, "request cancelled");
+ } else {
+ try {
+ error = session.sendMessage(request.getMessage()).getError();
+ } catch (final Exception e) {
+ error = new Error(ErrorCode.FATAL_ERROR, e.toString());
+ }
+ }
+ if (error == null) {
+ return true;
+ }
+ if (error.isFatal()) {
+ final Reply reply = new EmptyReply();
+ reply.swapState(request.getMessage());
+ reply.addError(error);
+ reply.popHandler().handleReply(reply);
+ return true;
+ }
+ return false;
+ }
+
+ private class SenderTask implements Runnable {
+
+ @Override
+ public void run() {
+ while (!done) {
+ try {
+ final MbusRequest request = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (request == null) {
+ continue;
+ }
+ sendBlocking(request);
+ } catch (final Exception e) {
+ log.log(LogLevel.WARNING, "Ignoring exception thrown by MbusClient.", e);
+ }
+ }
+ }
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java
new file mode 100644
index 00000000000..55c5f28ead6
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java
@@ -0,0 +1,38 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.service.CurrentContainer;
+import com.yahoo.messagebus.Message;
+
+import java.net.URI;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class MbusRequest extends Request {
+
+ private final Message message;
+
+ public MbusRequest(CurrentContainer current, URI uri, Message msg) {
+ super(current, uri);
+ this.message = validateMessage(msg);
+ }
+
+ public MbusRequest(Request parent, URI uri, Message msg) {
+ super(parent, uri);
+ this.message = validateMessage(msg);
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ private Message validateMessage(Message msg) {
+ if (msg != null) {
+ return msg;
+ }
+ release();
+ throw new NullPointerException();
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java
new file mode 100644
index 00000000000..73c143eb29d
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java
@@ -0,0 +1,59 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public abstract class MbusRequestHandler extends AbstractRequestHandler implements MessageHandler {
+
+ @Override
+ public ContentChannel handleRequest(final Request request, final ResponseHandler handler) {
+ if (!(request instanceof MbusRequest)) {
+ throw new UnsupportedOperationException("Expected MbusRequest, got " + request.getClass().getName() + ".");
+ }
+ final Message msg = ((MbusRequest)request).getMessage();
+ msg.pushHandler(new RespondingReplyHandler(handler));
+ handleMessage(msg);
+ return null;
+ }
+
+ private static class RespondingReplyHandler implements ReplyHandler {
+
+ private final ResponseHandler handler;
+
+ RespondingReplyHandler(final ResponseHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void handleReply(final Reply reply) {
+ final MbusResponse response = new MbusResponse(StatusCodes.fromMbusReply(reply), reply);
+ handler.handleResponse(response).close(IgnoringCompletionHandler.INSTANCE);
+ }
+ }
+
+ private static class IgnoringCompletionHandler implements CompletionHandler {
+
+ public static final IgnoringCompletionHandler INSTANCE = new IgnoringCompletionHandler();
+
+ @Override
+ public void completed() {
+
+ }
+
+ @Override
+ public void failed(final Throwable t) {
+
+ }
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java
new file mode 100644
index 00000000000..23baa17b881
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java
@@ -0,0 +1,25 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Response;
+import com.yahoo.messagebus.Reply;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class MbusResponse extends Response {
+
+ private final Reply reply;
+
+ public MbusResponse(int status, Reply reply) {
+ super(status);
+ if (reply == null) {
+ throw new NullPointerException();
+ }
+ this.reply = reply;
+ }
+
+ public Reply getReply() {
+ return reply;
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
new file mode 100644
index 00000000000..6a8b321cac8
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
@@ -0,0 +1,135 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.google.inject.Inject;
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.service.CurrentContainer;
+import com.yahoo.jdisc.service.ServerProvider;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.shared.ServerSession;
+
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler {
+
+ private final static Logger log = Logger.getLogger(MbusServer.class.getName());
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final CurrentContainer container;
+ private final ServerSession session;
+ private final URI uri;
+ private final ResourceReference sessionReference;
+
+ @Inject
+ public MbusServer(final CurrentContainer container, final ServerSession session) {
+ this.container = container;
+ this.session = session;
+ uri = URI.create("mbus://localhost/" + session.name());
+ session.setMessageHandler(this);
+ sessionReference = session.refer();
+ }
+
+ @Override
+ public void start() {
+ log.log(LogLevel.DEBUG, "Starting message bus server.");
+ running.set(true);
+ }
+
+ @Override
+ public void close() {
+ log.log(LogLevel.DEBUG, "Closing message bus server.");
+ running.set(false);
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(LogLevel.DEBUG, "Destroying message bus server.");
+ running.set(false);
+ sessionReference.close();
+ }
+
+ @Override
+ public void handleMessage(final Message msg) {
+ if (!running.get()) {
+ dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed.");
+ return;
+ }
+ if (msg.getTrace().shouldTrace(6)) {
+ msg.getTrace().trace(6, "Message received by MbusServer.");
+ }
+ Request request = null;
+ ContentChannel content = null;
+ try {
+ request = new MbusRequest(container, uri, msg);
+ content = request.connect(new ServerResponseHandler(msg));
+ } catch (final RuntimeException e) {
+ dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString());
+ } finally {
+ if (request != null) {
+ request.release();
+ }
+ }
+ if (content != null) {
+ content.close(IgnoredCompletionHandler.INSTANCE);
+ }
+ }
+
+ public String connectionSpec() {
+ return session.connectionSpec();
+ }
+
+ private void dispatchErrorReply(final Message msg, final int errCode, final String errMsg) {
+ final Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(errCode, errMsg));
+ session.sendReply(reply);
+ }
+
+ private class ServerResponseHandler implements ResponseHandler {
+
+ final Message msg;
+
+ ServerResponseHandler(final Message msg) {
+ this.msg = msg;
+ }
+
+ @Override
+ public ContentChannel handleResponse(final Response response) {
+ final Reply reply;
+ if (response instanceof MbusResponse) {
+ reply = ((MbusResponse)response).getReply();
+ } else {
+ reply = new EmptyReply();
+ reply.swapState(msg);
+ }
+ final Error err = StatusCodes.toMbusError(response.getStatus());
+ if (err != null) {
+ if (err.isFatal()) {
+ if (!reply.hasFatalErrors()) {
+ reply.addError(err);
+ }
+ } else {
+ if (!reply.hasErrors()) {
+ reply.addError(err);
+ }
+ }
+ }
+ if (reply.getTrace().shouldTrace(6)) {
+ reply.getTrace().trace(6, "Sending reply from MbusServer.");
+ }
+ session.sendReply(reply);
+ return null;
+ }
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
new file mode 100644
index 00000000000..b7543054b8c
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
@@ -0,0 +1,79 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.jdisc.Response;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Reply;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class StatusCodes {
+
+ public static int fromMbusReply(final Reply reply) {
+ int statusCode = Response.Status.OK;
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ statusCode = Math.max(statusCode, fromMbusError(reply.getError(i)));
+ }
+ return statusCode;
+ }
+
+ public static int fromMbusError(final Error error) {
+ final int errorCode = error.getCode();
+ if (errorCode < ErrorCode.TRANSIENT_ERROR) {
+ return Response.Status.OK;
+ }
+ if (errorCode < ErrorCode.FATAL_ERROR) {
+ return Response.Status.TEMPORARY_REDIRECT;
+ }
+ switch (errorCode) {
+ case ErrorCode.SEND_QUEUE_CLOSED:
+ return Response.Status.LOCKED;
+ case ErrorCode.ILLEGAL_ROUTE:
+ return Response.Status.BAD_REQUEST;
+ case ErrorCode.NO_SERVICES_FOR_ROUTE:
+ return Response.Status.NOT_FOUND;
+ case ErrorCode.SERVICE_OOS:
+ return Response.Status.SERVICE_UNAVAILABLE;
+ case ErrorCode.ENCODE_ERROR:
+ return Response.Status.BAD_REQUEST;
+ case ErrorCode.NETWORK_ERROR:
+ return Response.Status.BAD_REQUEST; // got nothing better
+ case ErrorCode.UNKNOWN_PROTOCOL:
+ return Response.Status.UNSUPPORTED_MEDIA_TYPE;
+ case ErrorCode.DECODE_ERROR:
+ return Response.Status.UNSUPPORTED_MEDIA_TYPE;
+ case ErrorCode.TIMEOUT:
+ return Response.Status.REQUEST_TIMEOUT;
+ case ErrorCode.INCOMPATIBLE_VERSION:
+ return Response.Status.VERSION_NOT_SUPPORTED;
+ case ErrorCode.UNKNOWN_POLICY:
+ return Response.Status.BAD_REQUEST;
+ case ErrorCode.NETWORK_SHUTDOWN:
+ return Response.Status.LOCKED;
+ case ErrorCode.POLICY_ERROR:
+ return Response.Status.PRECONDITION_FAILED;
+ case ErrorCode.SEQUENCE_ERROR:
+ return Response.Status.PRECONDITION_FAILED;
+ case ErrorCode.APP_FATAL_ERROR:
+ return Response.Status.INTERNAL_SERVER_ERROR;
+ default:
+ return Response.Status.INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ public static Error toMbusError(final int statusCode) {
+ if (statusCode < 300) {
+ return null;
+ } else if (statusCode < 400) {
+ return new Error(ErrorCode.APP_TRANSIENT_ERROR, statusCode + " Redirection");
+ } else if (statusCode < 500) {
+ return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Client Error");
+ } else if (statusCode < 600) {
+ return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Server Error");
+ } else {
+ return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Unknown Error");
+ }
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java
new file mode 100644
index 00000000000..99c9a28b381
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.jdisc;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
new file mode 100644
index 00000000000..b478f9fe8db
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
@@ -0,0 +1,134 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jdisc.References;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.MbusClient;
+import com.yahoo.messagebus.jdisc.MbusRequest;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ClientTestDriver {
+
+ private final RemoteServer server;
+ private final MbusClient client;
+ private final SharedSourceSession session;
+ private final TestDriver driver;
+
+ private ClientTestDriver(RemoteServer server, Protocol protocol) {
+ this.server = server;
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ session = mbus.newSourceSession(new SourceSessionParams());
+ client = new MbusClient(session);
+ client.start();
+ mbus.release();
+
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ ContainerBuilder builder = driver.newContainerBuilder();
+ builder.clientBindings().bind("mbus://*/*", client);
+ driver.activateContainer(builder);
+ }
+
+ public SourceSession sourceSession() {
+ return session.session();
+ }
+
+ public Request newServerRequest() {
+ return new Request(driver, URI.create("mbus://localhost/"));
+ }
+
+ public Request newClientRequest(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ if (msg.getTrace().getLevel() == 0) {
+ msg.getTrace().setLevel(9);
+ }
+ final Request parent = newServerRequest();
+ try (final ResourceReference ref = References.fromResource(parent)) {
+ return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg);
+ }
+ }
+
+ public boolean sendRequest(Request request, ResponseHandler responseHandler) {
+ request.connect(responseHandler).close(null);
+ return true;
+ }
+
+ public boolean sendMessage(Message msg, ResponseHandler responseHandler) {
+ final Request request = newClientRequest(msg);
+ try (final ResourceReference ref = References.fromResource(request)) {
+ return sendRequest(request, responseHandler);
+ }
+ }
+
+ public Message awaitMessage() {
+ Message msg = null;
+ try {
+ msg = server.awaitMessage(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (msg != null) {
+ msg.getTrace().trace(0, "Message received by RemoteServer.");
+ }
+ return msg;
+ }
+
+ public void sendReply(Reply reply) {
+ reply.getTrace().trace(0, "Sending reply from RemoteServer.");
+ server.sendReply(reply);
+ }
+
+ public boolean awaitMessageAndSendReply(Reply reply) {
+ Message msg = awaitMessage();
+ if (msg == null) {
+ return false;
+ }
+ reply.swapState(msg);
+ sendReply(reply);
+ return true;
+ }
+
+ public boolean close() {
+ session.release();
+ client.release();
+ server.close();
+ return driver.close();
+ }
+
+ public MbusClient client() {
+ return client;
+ }
+
+ public RemoteServer server() {
+ return server;
+ }
+
+ public static ClientTestDriver newInstance() {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol());
+ }
+
+ public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
+ }
+
+ public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
new file mode 100644
index 00000000000..be280d416f4
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
@@ -0,0 +1,27 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class MessageQueue implements MessageHandler {
+
+ private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+
+ @Override
+ public void handleMessage(Message msg) {
+ queue.add(msg);
+ }
+
+ public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
new file mode 100644
index 00000000000..e918ab46d95
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
@@ -0,0 +1,73 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RemoteClient {
+
+ private final Slobrok slobrok;
+ private final String slobrokId;
+ private final MessageBus mbus;
+ private final ReplyQueue queue = new ReplyQueue();
+ private final SourceSession session;
+
+ private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)),
+ new MessageBusParams().addProtocol(protocol));
+ session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue));
+ }
+
+ public Result sendMessage(Message msg) {
+ return session.send(msg);
+ }
+
+ public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.awaitReply(timeout, unit);
+ }
+
+ public String slobrokId() {
+ return slobrokId;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
+ }
+
+ public static RemoteClient newInstanceWithInternSlobrok() {
+ return new RemoteClient(newSlobrok(), null, new SimpleProtocol());
+ }
+
+ public static RemoteClient newInstanceWithExternSlobrok(String slobrokId) {
+ return new RemoteClient(null, slobrokId, new SimpleProtocol());
+ }
+
+ public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol) {
+ return new RemoteClient(newSlobrok(), null, protocol);
+ }
+
+ private static Slobrok newSlobrok() {
+ Slobrok slobrok;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ throw new IllegalStateException(e);
+ }
+ return slobrok;
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
new file mode 100644
index 00000000000..8bd058eab9b
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
@@ -0,0 +1,87 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RemoteServer {
+
+ private final Slobrok slobrok;
+ private final String slobrokId;
+ private final MessageBus mbus;
+ private final MessageQueue queue = new MessageQueue();
+ private final DestinationSession session;
+
+ private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
+ .setSlobrokConfigId(this.slobrokId)
+ .setIdentity(new Identity(identity))),
+ new MessageBusParams().addProtocol(protocol));
+ session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue));
+ }
+
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.awaitMessage(timeout, unit);
+ }
+
+ public void ackMessage(Message msg) {
+ session.acknowledge(msg);
+ }
+
+ public void sendReply(Reply reply) {
+ session.reply(reply);
+ }
+
+ public String slobrokId() {
+ return slobrokId;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
+ }
+
+ public static RemoteServer newInstanceWithInternSlobrok() {
+ return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) {
+ return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) {
+ return new RemoteServer(null, slobrokId, protocol, identity);
+ }
+
+ public static RemoteServer newInstanceWithProtocol(Protocol protocol) {
+ return new RemoteServer(newSlobrok(), null, protocol, "remote");
+ }
+
+ private static Slobrok newSlobrok() {
+ try {
+ return new Slobrok();
+ } catch (ListenFailedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
new file mode 100644
index 00000000000..9418733a221
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
@@ -0,0 +1,26 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ReplyQueue implements ReplyHandler {
+
+ private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>();
+
+ @Override
+ public void handleReply(Reply reply) {
+ queue.add(reply);
+ }
+
+ public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
new file mode 100644
index 00000000000..1ef492c18aa
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
@@ -0,0 +1,154 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.google.inject.Module;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.jdisc.MbusServer;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.ServerSession;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ServerTestDriver {
+
+ private final RemoteClient client;
+ private final MbusServer server;
+ private final TestDriver driver;
+
+ private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler,
+ Protocol protocol, Module... guiceModules)
+ {
+ this.client = client;
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules);
+ if (activateContainer) {
+ ContainerBuilder builder = driver.newContainerBuilder();
+ if (requestHandler != null) {
+ builder.serverBindings().bind("mbus://*/*", requestHandler);
+ }
+ driver.activateContainer(builder);
+ }
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ ServerSession session = mbus.newDestinationSession(new DestinationSessionParams());
+ server = new MbusServer(driver, session);
+ server.start();
+ session.release();
+ mbus.release();
+ }
+
+ public boolean sendMessage(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ msg.getTrace().setLevel(9);
+ return client.sendMessage(msg).isAccepted();
+ }
+
+ public Reply awaitReply() {
+ Reply reply = null;
+ try {
+ reply = client.awaitReply(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (reply != null) {
+ System.out.println(reply.getTrace());
+ }
+ return reply;
+ }
+
+ public Reply awaitSuccess() {
+ Reply reply = awaitReply();
+ if (reply == null || reply.hasErrors()) {
+ return null;
+ }
+ return reply;
+ }
+
+ public Reply awaitErrors(Integer... errCodes) {
+ Reply reply = awaitReply();
+ if (reply == null) {
+ return null;
+ }
+ List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes));
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ Error err = reply.getError(i);
+ System.out.println(err);
+ int idx = lst.indexOf(err.getCode());
+ if (idx < 0) {
+ return null;
+ }
+ lst.remove(idx);
+ }
+ if (!lst.isEmpty()) {
+ return null;
+ }
+ return reply;
+ }
+
+ public boolean close() {
+ server.close();
+ server.release();
+ client.close();
+ return driver.close();
+ }
+
+ public TestDriver parent() {
+ return driver;
+ }
+
+ public RemoteClient client() {
+ return client;
+ }
+
+ public MbusServer server() {
+ return server;
+ }
+
+ public static ServerTestDriver newInstance(RequestHandler requestHandler, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, requestHandler,
+ new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler,
+ Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, requestHandler, protocol,
+ guiceModules);
+ }
+
+ public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler,
+ Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId),
+ true, requestHandler, new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInactiveInstance(Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), false, null,
+ new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol), false, null,
+ protocol, guiceModules);
+ }
+
+ public static ServerTestDriver newUnboundInstance(Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, null,
+ new SimpleProtocol(), guiceModules);
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/metrics/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/metrics/package-info.java
new file mode 100644
index 00000000000..0562757fd9d
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/metrics/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.metrics;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java
new file mode 100644
index 00000000000..fbd3dbe2291
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.network;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java
new file mode 100644
index 00000000000..7cbde7e54be
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java
new file mode 100644
index 00000000000..1544a726470
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java
new file mode 100644
index 00000000000..a060ac876fe
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
new file mode 100644
index 00000000000..7bf667fc120
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
@@ -0,0 +1,14 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.SharedResource;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Result;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public interface ClientSession extends SharedResource {
+
+ public Result sendMessage(Message msg);
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
new file mode 100644
index 00000000000..efafb491885
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
@@ -0,0 +1,73 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.NetworkOwner;
+import com.yahoo.messagebus.routing.RoutingNode;
+
+import java.util.List;
+
+/**
+ * <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p>
+ *
+ * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a>
+ */
+class NullNetwork implements Network {
+
+ @Override
+ public boolean waitUntilReady(double seconds) {
+ return true;
+ }
+
+ @Override
+ public void attach(NetworkOwner owner) {
+
+ }
+
+ @Override
+ public void registerSession(String session) {
+
+ }
+
+ @Override
+ public void unregisterSession(String session) {
+
+ }
+
+ @Override
+ public boolean allocServiceAddress(RoutingNode recipient) {
+ return false;
+ }
+
+ @Override
+ public void freeServiceAddress(RoutingNode recipient) {
+
+ }
+
+ @Override
+ public void send(Message msg, List<RoutingNode> recipients) {
+
+ }
+
+ @Override
+ public void sync() {
+
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public String getConnectionSpec() {
+ return null;
+ }
+
+ @Override
+ public IMirror getMirror() {
+ return null;
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
new file mode 100644
index 00000000000..30e6ec1fb46
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.SharedResource;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public interface ServerSession extends SharedResource {
+
+ public MessageHandler getMessageHandler();
+
+ public void setMessageHandler(MessageHandler msgHandler);
+
+ public void sendReply(Reply reply);
+
+ public String connectionSpec();
+
+ public String name();
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
new file mode 100644
index 00000000000..08f366af048
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
@@ -0,0 +1,81 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SharedDestinationSession extends AbstractResource implements MessageHandler, ServerSession {
+
+ private static final Logger log = Logger.getLogger(SharedDestinationSession.class.getName());
+ private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>();
+ private final SharedMessageBus mbus;
+ private final DestinationSession session;
+ private final ResourceReference mbusReference;
+
+ public SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) {
+ this.mbus = mbus;
+ this.msgHandler.set(params.getMessageHandler());
+ this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public DestinationSession session() {
+ return session;
+ }
+
+ @Override
+ public void sendReply(Reply reply) {
+ session.reply(reply);
+ }
+
+ @Override
+ public MessageHandler getMessageHandler() {
+ return msgHandler.get();
+ }
+
+ @Override
+ public void setMessageHandler(MessageHandler msgHandler) {
+ if (!this.msgHandler.compareAndSet(null, msgHandler)) {
+ throw new IllegalStateException("Message handler already registered.");
+ }
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ MessageHandler msgHandler = this.msgHandler.get();
+ if (msgHandler == null) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet."));
+ sendReply(reply);
+ return;
+ }
+ msgHandler.handleMessage(msg);
+ }
+
+ @Override
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ @Override
+ public String name() {
+ return session.getName();
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(LogLevel.DEBUG, "Destroying shared destination session.");
+ session.destroy();
+ mbusReference.close();
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
new file mode 100644
index 00000000000..0c90f602e44
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
@@ -0,0 +1,98 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SharedIntermediateSession extends AbstractResource
+ implements ClientSession, ServerSession, MessageHandler, ReplyHandler
+{
+
+ private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName());
+ private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>();
+ private final SharedMessageBus mbus;
+ private final IntermediateSession session;
+ private final ResourceReference mbusReference;
+
+ public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) {
+ if (params.getReplyHandler() != null) {
+ throw new IllegalArgumentException("Reply handler must be null.");
+ }
+ this.mbus = mbus;
+ this.msgHandler.set(params.getMessageHandler());
+ this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this)
+ .setMessageHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public IntermediateSession session() {
+ return session;
+ }
+
+ @Override
+ public Result sendMessage(Message msg) {
+ session.forward(msg);
+ return Result.ACCEPTED;
+ }
+
+ @Override
+ public void sendReply(Reply reply) {
+ session.forward(reply);
+ }
+
+ @Override
+ public MessageHandler getMessageHandler() {
+ return msgHandler.get();
+ }
+
+ @Override
+ public void setMessageHandler(MessageHandler msgHandler) {
+ if (!this.msgHandler.compareAndSet(null, msgHandler)) {
+ throw new IllegalStateException("Message handler already registered.");
+ }
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ MessageHandler msgHandler = this.msgHandler.get();
+ if (msgHandler == null) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet."));
+ sendReply(reply);
+ return;
+ }
+ msgHandler.handleMessage(msg);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ reply.popHandler().handleReply(reply);
+ }
+
+ @Override
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ @Override
+ public String name() {
+ return session.getName();
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(LogLevel.DEBUG, "Destroying shared intermediate session.");
+ session.destroy();
+ mbusReference.close();
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
new file mode 100644
index 00000000000..bf1ce4c26ad
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
@@ -0,0 +1,64 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.config.subscription.ConfigGetter;
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.cloud.config.SlobroksConfig;
+
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SharedMessageBus extends AbstractResource {
+
+ private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName());
+ private final MessageBus mbus;
+
+ public SharedMessageBus(MessageBus mbus) {
+ mbus.getClass(); // throws NullPointerException
+ this.mbus = mbus;
+ }
+
+ public MessageBus messageBus() {
+ return mbus;
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(LogLevel.DEBUG, "Destroying shared message bus.");
+ mbus.destroy();
+ }
+
+ public SharedSourceSession newSourceSession(SourceSessionParams params) {
+ return new SharedSourceSession(this, params);
+ }
+
+ public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) {
+ return new SharedIntermediateSession(this, params);
+ }
+
+ public SharedDestinationSession newDestinationSession(DestinationSessionParams params) {
+ return new SharedDestinationSession(this, params);
+ }
+
+ public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) {
+ return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams));
+ }
+
+ private static Network newNetwork(RPCNetworkParams params) {
+ SlobroksConfig cfg = params.getSlobroksConfig();
+ if (cfg == null) {
+ cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId());
+ }
+ if (cfg.slobrok().isEmpty()) {
+ return new NullNetwork(); // for LocalApplication
+ }
+ return new RPCNetwork(params);
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
new file mode 100644
index 00000000000..2ce76a50b73
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
@@ -0,0 +1,60 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Result;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName());
+ private final SharedMessageBus mbus;
+ private final SourceSession session;
+ private final ResourceReference mbusReference;
+
+ public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) {
+ if (params.getReplyHandler() != null) {
+ throw new IllegalArgumentException("Reply handler must be null.");
+ }
+ this.mbus = mbus;
+ this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this));
+ this.mbusReference = mbus.refer();
+ }
+
+ public SourceSession session() {
+ return session;
+ }
+
+ @Override
+ public Result sendMessage(Message msg) {
+ return session.send(msg);
+ }
+
+ public Result sendMessageBlocking(Message msg) throws InterruptedException {
+ return session.sendBlocking(msg);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ reply.popHandler().handleReply(reply);
+ }
+
+ @Override
+ protected void destroy() {
+ log.log(LogLevel.DEBUG, "Destroying shared source session.");
+ session.close();
+ mbusReference.close();
+ }
+
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java
new file mode 100644
index 00000000000..3090154a9a6
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java
@@ -0,0 +1,8 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * Not a public API, exported for use in internal components.
+ */
+@ExportPackage
+package com.yahoo.messagebus.shared;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java
new file mode 100644
index 00000000000..266365ac747
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.messagebus.test;
+
+import com.yahoo.osgi.annotation.ExportPackage;