diff options
Diffstat (limited to 'jdisc_messagebus_service/src/main/java')
28 files changed, 1461 insertions, 0 deletions
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java new file mode 100644 index 00000000000..8c55be9cd89 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.handler.CompletionHandler; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +enum IgnoredCompletionHandler implements CompletionHandler { + + INSTANCE; + + @Override + public void completed() { + + } + + @Override + public void failed(final Throwable t) { + + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java new file mode 100644 index 00000000000..e1740433d83 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java @@ -0,0 +1,147 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.Inject; +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDeniedException; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.service.ClientProvider; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.shared.ClientSession; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public final class MbusClient extends AbstractResource implements ClientProvider, ReplyHandler { + + private static final Logger log = Logger.getLogger(MbusClient.class.getName()); + private final BlockingQueue<MbusRequest> queue = new LinkedBlockingQueue<>(); + private final ClientSession session; + private final Thread thread = new Thread(new SenderTask(), "MbusClient"); + private volatile boolean done = false; + private final ResourceReference sessionReference; + + @Inject + public MbusClient(final ClientSession session) { + this.session = session; + this.sessionReference = session.refer(); + } + + @Override + public void start() { + thread.start(); + } + + @Override + public ContentChannel handleRequest(final Request request, final ResponseHandler handler) { + if (!(request instanceof MbusRequest)) { + throw new RequestDeniedException(request); + } + final Message msg = ((MbusRequest)request).getMessage(); + msg.getTrace().trace(6, "Request received by MbusClient."); + msg.pushHandler(null); // save user context + final Long timeout = request.timeRemaining(TimeUnit.MILLISECONDS); + if (timeout != null) { + msg.setTimeReceivedNow(); + msg.setTimeRemaining(Math.max(1, timeout)); // negative or zero timeout has semantics + } + msg.setContext(handler); + msg.pushHandler(this); + queue.add((MbusRequest)request); + return null; + } + + @Override + public void handleTimeout(final Request request, final ResponseHandler handler) { + // ignore, mbus has guaranteed reply + } + + @Override + protected void destroy() { + log.log(LogLevel.DEBUG, "Destroying message bus client."); + sessionReference.close(); + done = true; + } + + @Override + public void handleReply(final Reply reply) { + reply.getTrace().trace(6, "Reply received by MbusClient."); + final ResponseHandler handler = (ResponseHandler)reply.getContext(); + reply.popHandler(); // restore user context + try { + handler.handleResponse(new MbusResponse(StatusCodes.fromMbusReply(reply), reply)) + .close(IgnoredCompletionHandler.INSTANCE); + } catch (final Exception e) { + log.log(LogLevel.WARNING, "Ignoring exception thrown by ResponseHandler.", e); + } + } + + private void sendBlocking(final MbusRequest request) { + while (!sendMessage(request)) { + try { + Thread.sleep(100); + } catch (final InterruptedException e) { + // ignore + } + } + } + + private boolean sendMessage(final MbusRequest request) { + Error error; + final Long millis = request.timeRemaining(TimeUnit.MILLISECONDS); + if (millis != null && millis <= 0) { + error = new Error(ErrorCode.TIMEOUT, request.getTimeout(TimeUnit.MILLISECONDS) + " millis"); + } else if (request.isCancelled()) { + error = new Error(ErrorCode.APP_FATAL_ERROR, "request cancelled"); + } else { + try { + error = session.sendMessage(request.getMessage()).getError(); + } catch (final Exception e) { + error = new Error(ErrorCode.FATAL_ERROR, e.toString()); + } + } + if (error == null) { + return true; + } + if (error.isFatal()) { + final Reply reply = new EmptyReply(); + reply.swapState(request.getMessage()); + reply.addError(error); + reply.popHandler().handleReply(reply); + return true; + } + return false; + } + + private class SenderTask implements Runnable { + + @Override + public void run() { + while (!done) { + try { + final MbusRequest request = queue.poll(100, TimeUnit.MILLISECONDS); + if (request == null) { + continue; + } + sendBlocking(request); + } catch (final Exception e) { + log.log(LogLevel.WARNING, "Ignoring exception thrown by MbusClient.", e); + } + } + } + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java new file mode 100644 index 00000000000..55c5f28ead6 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java @@ -0,0 +1,38 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.messagebus.Message; + +import java.net.URI; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class MbusRequest extends Request { + + private final Message message; + + public MbusRequest(CurrentContainer current, URI uri, Message msg) { + super(current, uri); + this.message = validateMessage(msg); + } + + public MbusRequest(Request parent, URI uri, Message msg) { + super(parent, uri); + this.message = validateMessage(msg); + } + + public Message getMessage() { + return message; + } + + private Message validateMessage(Message msg) { + if (msg != null) { + return msg; + } + release(); + throw new NullPointerException(); + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java new file mode 100644 index 00000000000..73c143eb29d --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java @@ -0,0 +1,59 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.handler.AbstractRequestHandler; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public abstract class MbusRequestHandler extends AbstractRequestHandler implements MessageHandler { + + @Override + public ContentChannel handleRequest(final Request request, final ResponseHandler handler) { + if (!(request instanceof MbusRequest)) { + throw new UnsupportedOperationException("Expected MbusRequest, got " + request.getClass().getName() + "."); + } + final Message msg = ((MbusRequest)request).getMessage(); + msg.pushHandler(new RespondingReplyHandler(handler)); + handleMessage(msg); + return null; + } + + private static class RespondingReplyHandler implements ReplyHandler { + + private final ResponseHandler handler; + + RespondingReplyHandler(final ResponseHandler handler) { + this.handler = handler; + } + + @Override + public void handleReply(final Reply reply) { + final MbusResponse response = new MbusResponse(StatusCodes.fromMbusReply(reply), reply); + handler.handleResponse(response).close(IgnoringCompletionHandler.INSTANCE); + } + } + + private static class IgnoringCompletionHandler implements CompletionHandler { + + public static final IgnoringCompletionHandler INSTANCE = new IgnoringCompletionHandler(); + + @Override + public void completed() { + + } + + @Override + public void failed(final Throwable t) { + + } + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java new file mode 100644 index 00000000000..23baa17b881 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java @@ -0,0 +1,25 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Response; +import com.yahoo.messagebus.Reply; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class MbusResponse extends Response { + + private final Reply reply; + + public MbusResponse(int status, Reply reply) { + super(status); + if (reply == null) { + throw new NullPointerException(); + } + this.reply = reply; + } + + public Reply getReply() { + return reply; + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java new file mode 100644 index 00000000000..6a8b321cac8 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java @@ -0,0 +1,135 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.Inject; +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.jdisc.service.ServerProvider; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.shared.ServerSession; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler { + + private final static Logger log = Logger.getLogger(MbusServer.class.getName()); + private final AtomicBoolean running = new AtomicBoolean(false); + private final CurrentContainer container; + private final ServerSession session; + private final URI uri; + private final ResourceReference sessionReference; + + @Inject + public MbusServer(final CurrentContainer container, final ServerSession session) { + this.container = container; + this.session = session; + uri = URI.create("mbus://localhost/" + session.name()); + session.setMessageHandler(this); + sessionReference = session.refer(); + } + + @Override + public void start() { + log.log(LogLevel.DEBUG, "Starting message bus server."); + running.set(true); + } + + @Override + public void close() { + log.log(LogLevel.DEBUG, "Closing message bus server."); + running.set(false); + } + + @Override + protected void destroy() { + log.log(LogLevel.DEBUG, "Destroying message bus server."); + running.set(false); + sessionReference.close(); + } + + @Override + public void handleMessage(final Message msg) { + if (!running.get()) { + dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed."); + return; + } + if (msg.getTrace().shouldTrace(6)) { + msg.getTrace().trace(6, "Message received by MbusServer."); + } + Request request = null; + ContentChannel content = null; + try { + request = new MbusRequest(container, uri, msg); + content = request.connect(new ServerResponseHandler(msg)); + } catch (final RuntimeException e) { + dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString()); + } finally { + if (request != null) { + request.release(); + } + } + if (content != null) { + content.close(IgnoredCompletionHandler.INSTANCE); + } + } + + public String connectionSpec() { + return session.connectionSpec(); + } + + private void dispatchErrorReply(final Message msg, final int errCode, final String errMsg) { + final Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(errCode, errMsg)); + session.sendReply(reply); + } + + private class ServerResponseHandler implements ResponseHandler { + + final Message msg; + + ServerResponseHandler(final Message msg) { + this.msg = msg; + } + + @Override + public ContentChannel handleResponse(final Response response) { + final Reply reply; + if (response instanceof MbusResponse) { + reply = ((MbusResponse)response).getReply(); + } else { + reply = new EmptyReply(); + reply.swapState(msg); + } + final Error err = StatusCodes.toMbusError(response.getStatus()); + if (err != null) { + if (err.isFatal()) { + if (!reply.hasFatalErrors()) { + reply.addError(err); + } + } else { + if (!reply.hasErrors()) { + reply.addError(err); + } + } + } + if (reply.getTrace().shouldTrace(6)) { + reply.getTrace().trace(6, "Sending reply from MbusServer."); + } + session.sendReply(reply); + return null; + } + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java new file mode 100644 index 00000000000..b7543054b8c --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java @@ -0,0 +1,79 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Response; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Reply; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class StatusCodes { + + public static int fromMbusReply(final Reply reply) { + int statusCode = Response.Status.OK; + for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { + statusCode = Math.max(statusCode, fromMbusError(reply.getError(i))); + } + return statusCode; + } + + public static int fromMbusError(final Error error) { + final int errorCode = error.getCode(); + if (errorCode < ErrorCode.TRANSIENT_ERROR) { + return Response.Status.OK; + } + if (errorCode < ErrorCode.FATAL_ERROR) { + return Response.Status.TEMPORARY_REDIRECT; + } + switch (errorCode) { + case ErrorCode.SEND_QUEUE_CLOSED: + return Response.Status.LOCKED; + case ErrorCode.ILLEGAL_ROUTE: + return Response.Status.BAD_REQUEST; + case ErrorCode.NO_SERVICES_FOR_ROUTE: + return Response.Status.NOT_FOUND; + case ErrorCode.SERVICE_OOS: + return Response.Status.SERVICE_UNAVAILABLE; + case ErrorCode.ENCODE_ERROR: + return Response.Status.BAD_REQUEST; + case ErrorCode.NETWORK_ERROR: + return Response.Status.BAD_REQUEST; // got nothing better + case ErrorCode.UNKNOWN_PROTOCOL: + return Response.Status.UNSUPPORTED_MEDIA_TYPE; + case ErrorCode.DECODE_ERROR: + return Response.Status.UNSUPPORTED_MEDIA_TYPE; + case ErrorCode.TIMEOUT: + return Response.Status.REQUEST_TIMEOUT; + case ErrorCode.INCOMPATIBLE_VERSION: + return Response.Status.VERSION_NOT_SUPPORTED; + case ErrorCode.UNKNOWN_POLICY: + return Response.Status.BAD_REQUEST; + case ErrorCode.NETWORK_SHUTDOWN: + return Response.Status.LOCKED; + case ErrorCode.POLICY_ERROR: + return Response.Status.PRECONDITION_FAILED; + case ErrorCode.SEQUENCE_ERROR: + return Response.Status.PRECONDITION_FAILED; + case ErrorCode.APP_FATAL_ERROR: + return Response.Status.INTERNAL_SERVER_ERROR; + default: + return Response.Status.INTERNAL_SERVER_ERROR; + } + } + + public static Error toMbusError(final int statusCode) { + if (statusCode < 300) { + return null; + } else if (statusCode < 400) { + return new Error(ErrorCode.APP_TRANSIENT_ERROR, statusCode + " Redirection"); + } else if (statusCode < 500) { + return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Client Error"); + } else if (statusCode < 600) { + return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Server Error"); + } else { + return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Unknown Error"); + } + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java new file mode 100644 index 00000000000..99c9a28b381 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.jdisc; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java new file mode 100644 index 00000000000..b478f9fe8db --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java @@ -0,0 +1,134 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jdisc.References; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.MbusClient; +import com.yahoo.messagebus.jdisc.MbusRequest; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ClientTestDriver { + + private final RemoteServer server; + private final MbusClient client; + private final SharedSourceSession session; + private final TestDriver driver; + + private ClientTestDriver(RemoteServer server, Protocol protocol) { + this.server = server; + + MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId()); + SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + session = mbus.newSourceSession(new SourceSessionParams()); + client = new MbusClient(session); + client.start(); + mbus.release(); + + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + ContainerBuilder builder = driver.newContainerBuilder(); + builder.clientBindings().bind("mbus://*/*", client); + driver.activateContainer(builder); + } + + public SourceSession sourceSession() { + return session.session(); + } + + public Request newServerRequest() { + return new Request(driver, URI.create("mbus://localhost/")); + } + + public Request newClientRequest(Message msg) { + msg.setRoute(Route.parse(server.connectionSpec())); + if (msg.getTrace().getLevel() == 0) { + msg.getTrace().setLevel(9); + } + final Request parent = newServerRequest(); + try (final ResourceReference ref = References.fromResource(parent)) { + return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg); + } + } + + public boolean sendRequest(Request request, ResponseHandler responseHandler) { + request.connect(responseHandler).close(null); + return true; + } + + public boolean sendMessage(Message msg, ResponseHandler responseHandler) { + final Request request = newClientRequest(msg); + try (final ResourceReference ref = References.fromResource(request)) { + return sendRequest(request, responseHandler); + } + } + + public Message awaitMessage() { + Message msg = null; + try { + msg = server.awaitMessage(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (msg != null) { + msg.getTrace().trace(0, "Message received by RemoteServer."); + } + return msg; + } + + public void sendReply(Reply reply) { + reply.getTrace().trace(0, "Sending reply from RemoteServer."); + server.sendReply(reply); + } + + public boolean awaitMessageAndSendReply(Reply reply) { + Message msg = awaitMessage(); + if (msg == null) { + return false; + } + reply.swapState(msg); + sendReply(reply); + return true; + } + + public boolean close() { + session.release(); + client.release(); + server.close(); + return driver.close(); + } + + public MbusClient client() { + return client; + } + + public RemoteServer server() { + return server; + } + + public static ClientTestDriver newInstance() { + return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol()); + } + + public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) { + return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol); + } + + public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) { + return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol()); + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java new file mode 100644 index 00000000000..be280d416f4 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class MessageQueue implements MessageHandler { + + private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); + + @Override + public void handleMessage(Message msg) { + queue.add(msg); + } + + public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java new file mode 100644 index 00000000000..e918ab46d95 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java @@ -0,0 +1,73 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.concurrent.TimeUnit; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RemoteClient { + + private final Slobrok slobrok; + private final String slobrokId; + private final MessageBus mbus; + private final ReplyQueue queue = new ReplyQueue(); + private final SourceSession session; + + private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol) { + this.slobrok = slobrok; + this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId; + mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)), + new MessageBusParams().addProtocol(protocol)); + session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue)); + } + + public Result sendMessage(Message msg) { + return session.send(msg); + } + + public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException { + return queue.awaitReply(timeout, unit); + } + + public String slobrokId() { + return slobrokId; + } + + public void close() { + session.destroy(); + mbus.destroy(); + if (slobrok != null) { + slobrok.stop(); + } + } + + public static RemoteClient newInstanceWithInternSlobrok() { + return new RemoteClient(newSlobrok(), null, new SimpleProtocol()); + } + + public static RemoteClient newInstanceWithExternSlobrok(String slobrokId) { + return new RemoteClient(null, slobrokId, new SimpleProtocol()); + } + + public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol) { + return new RemoteClient(newSlobrok(), null, protocol); + } + + private static Slobrok newSlobrok() { + Slobrok slobrok; + try { + slobrok = new Slobrok(); + } catch (ListenFailedException e) { + throw new IllegalStateException(e); + } + return slobrok; + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java new file mode 100644 index 00000000000..8bd058eab9b --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java @@ -0,0 +1,87 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.concurrent.TimeUnit; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RemoteServer { + + private final Slobrok slobrok; + private final String slobrokId; + private final MessageBus mbus; + private final MessageQueue queue = new MessageQueue(); + private final DestinationSession session; + + private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) { + this.slobrok = slobrok; + this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId; + mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams() + .setSlobrokConfigId(this.slobrokId) + .setIdentity(new Identity(identity))), + new MessageBusParams().addProtocol(protocol)); + session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue)); + } + + public String connectionSpec() { + return session.getConnectionSpec(); + } + + public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException { + return queue.awaitMessage(timeout, unit); + } + + public void ackMessage(Message msg) { + session.acknowledge(msg); + } + + public void sendReply(Reply reply) { + session.reply(reply); + } + + public String slobrokId() { + return slobrokId; + } + + public void close() { + session.destroy(); + mbus.destroy(); + if (slobrok != null) { + slobrok.stop(); + } + } + + public static RemoteServer newInstanceWithInternSlobrok() { + return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote"); + } + + public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) { + return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote"); + } + + public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) { + return new RemoteServer(null, slobrokId, protocol, identity); + } + + public static RemoteServer newInstanceWithProtocol(Protocol protocol) { + return new RemoteServer(newSlobrok(), null, protocol, "remote"); + } + + private static Slobrok newSlobrok() { + try { + return new Slobrok(); + } catch (ListenFailedException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java new file mode 100644 index 00000000000..9418733a221 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java @@ -0,0 +1,26 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ReplyQueue implements ReplyHandler { + + private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>(); + + @Override + public void handleReply(Reply reply) { + queue.add(reply); + } + + public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java new file mode 100644 index 00000000000..1ef492c18aa --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java @@ -0,0 +1,154 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.google.inject.Module; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.RequestHandler; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.jdisc.MbusServer; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.ServerSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ServerTestDriver { + + private final RemoteClient client; + private final MbusServer server; + private final TestDriver driver; + + private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler, + Protocol protocol, Module... guiceModules) + { + this.client = client; + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules); + if (activateContainer) { + ContainerBuilder builder = driver.newContainerBuilder(); + if (requestHandler != null) { + builder.serverBindings().bind("mbus://*/*", requestHandler); + } + driver.activateContainer(builder); + } + + MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId()); + SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + ServerSession session = mbus.newDestinationSession(new DestinationSessionParams()); + server = new MbusServer(driver, session); + server.start(); + session.release(); + mbus.release(); + } + + public boolean sendMessage(Message msg) { + msg.setRoute(Route.parse(server.connectionSpec())); + msg.getTrace().setLevel(9); + return client.sendMessage(msg).isAccepted(); + } + + public Reply awaitReply() { + Reply reply = null; + try { + reply = client.awaitReply(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (reply != null) { + System.out.println(reply.getTrace()); + } + return reply; + } + + public Reply awaitSuccess() { + Reply reply = awaitReply(); + if (reply == null || reply.hasErrors()) { + return null; + } + return reply; + } + + public Reply awaitErrors(Integer... errCodes) { + Reply reply = awaitReply(); + if (reply == null) { + return null; + } + List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes)); + for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { + Error err = reply.getError(i); + System.out.println(err); + int idx = lst.indexOf(err.getCode()); + if (idx < 0) { + return null; + } + lst.remove(idx); + } + if (!lst.isEmpty()) { + return null; + } + return reply; + } + + public boolean close() { + server.close(); + server.release(); + client.close(); + return driver.close(); + } + + public TestDriver parent() { + return driver; + } + + public RemoteClient client() { + return client; + } + + public MbusServer server() { + return server; + } + + public static ServerTestDriver newInstance(RequestHandler requestHandler, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, requestHandler, + new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler, + Module... guiceModules) + { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, requestHandler, protocol, + guiceModules); + } + + public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler, + Module... guiceModules) + { + return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId), + true, requestHandler, new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInactiveInstance(Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), false, null, + new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol), false, null, + protocol, guiceModules); + } + + public static ServerTestDriver newUnboundInstance(Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, null, + new SimpleProtocol(), guiceModules); + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/metrics/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/metrics/package-info.java new file mode 100644 index 00000000000..0562757fd9d --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/metrics/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.metrics; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java new file mode 100644 index 00000000000..fbd3dbe2291 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.network; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java new file mode 100644 index 00000000000..7cbde7e54be --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.network.rpc; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java new file mode 100644 index 00000000000..1544a726470 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java new file mode 100644 index 00000000000..a060ac876fe --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.routing; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java new file mode 100644 index 00000000000..7bf667fc120 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java @@ -0,0 +1,14 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.SharedResource; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Result; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public interface ClientSession extends SharedResource { + + public Result sendMessage(Message msg); +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java new file mode 100644 index 00000000000..efafb491885 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java @@ -0,0 +1,73 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.NetworkOwner; +import com.yahoo.messagebus.routing.RoutingNode; + +import java.util.List; + +/** + * <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p> + * + * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a> + */ +class NullNetwork implements Network { + + @Override + public boolean waitUntilReady(double seconds) { + return true; + } + + @Override + public void attach(NetworkOwner owner) { + + } + + @Override + public void registerSession(String session) { + + } + + @Override + public void unregisterSession(String session) { + + } + + @Override + public boolean allocServiceAddress(RoutingNode recipient) { + return false; + } + + @Override + public void freeServiceAddress(RoutingNode recipient) { + + } + + @Override + public void send(Message msg, List<RoutingNode> recipients) { + + } + + @Override + public void sync() { + + } + + @Override + public void shutdown() { + + } + + @Override + public String getConnectionSpec() { + return null; + } + + @Override + public IMirror getMirror() { + return null; + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java new file mode 100644 index 00000000000..30e6ec1fb46 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.SharedResource; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public interface ServerSession extends SharedResource { + + public MessageHandler getMessageHandler(); + + public void setMessageHandler(MessageHandler msgHandler); + + public void sendReply(Reply reply); + + public String connectionSpec(); + + public String name(); +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java new file mode 100644 index 00000000000..08f366af048 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java @@ -0,0 +1,81 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SharedDestinationSession extends AbstractResource implements MessageHandler, ServerSession { + + private static final Logger log = Logger.getLogger(SharedDestinationSession.class.getName()); + private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); + private final SharedMessageBus mbus; + private final DestinationSession session; + private final ResourceReference mbusReference; + + public SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) { + this.mbus = mbus; + this.msgHandler.set(params.getMessageHandler()); + this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this)); + this.mbusReference = mbus.refer(); + } + + public DestinationSession session() { + return session; + } + + @Override + public void sendReply(Reply reply) { + session.reply(reply); + } + + @Override + public MessageHandler getMessageHandler() { + return msgHandler.get(); + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + if (!this.msgHandler.compareAndSet(null, msgHandler)) { + throw new IllegalStateException("Message handler already registered."); + } + } + + @Override + public void handleMessage(Message msg) { + MessageHandler msgHandler = this.msgHandler.get(); + if (msgHandler == null) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); + sendReply(reply); + return; + } + msgHandler.handleMessage(msg); + } + + @Override + public String connectionSpec() { + return session.getConnectionSpec(); + } + + @Override + public String name() { + return session.getName(); + } + + @Override + protected void destroy() { + log.log(LogLevel.DEBUG, "Destroying shared destination session."); + session.destroy(); + mbusReference.close(); + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java new file mode 100644 index 00000000000..0c90f602e44 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java @@ -0,0 +1,98 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SharedIntermediateSession extends AbstractResource + implements ClientSession, ServerSession, MessageHandler, ReplyHandler +{ + + private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName()); + private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); + private final SharedMessageBus mbus; + private final IntermediateSession session; + private final ResourceReference mbusReference; + + public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.mbus = mbus; + this.msgHandler.set(params.getMessageHandler()); + this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this) + .setMessageHandler(this)); + this.mbusReference = mbus.refer(); + } + + public IntermediateSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + session.forward(msg); + return Result.ACCEPTED; + } + + @Override + public void sendReply(Reply reply) { + session.forward(reply); + } + + @Override + public MessageHandler getMessageHandler() { + return msgHandler.get(); + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + if (!this.msgHandler.compareAndSet(null, msgHandler)) { + throw new IllegalStateException("Message handler already registered."); + } + } + + @Override + public void handleMessage(Message msg) { + MessageHandler msgHandler = this.msgHandler.get(); + if (msgHandler == null) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); + sendReply(reply); + return; + } + msgHandler.handleMessage(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + public String connectionSpec() { + return session.getConnectionSpec(); + } + + @Override + public String name() { + return session.getName(); + } + + @Override + protected void destroy() { + log.log(LogLevel.DEBUG, "Destroying shared intermediate session."); + session.destroy(); + mbusReference.close(); + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java new file mode 100644 index 00000000000..bf1ce4c26ad --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java @@ -0,0 +1,64 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.config.subscription.ConfigGetter; +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.cloud.config.SlobroksConfig; + +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SharedMessageBus extends AbstractResource { + + private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName()); + private final MessageBus mbus; + + public SharedMessageBus(MessageBus mbus) { + mbus.getClass(); // throws NullPointerException + this.mbus = mbus; + } + + public MessageBus messageBus() { + return mbus; + } + + @Override + protected void destroy() { + log.log(LogLevel.DEBUG, "Destroying shared message bus."); + mbus.destroy(); + } + + public SharedSourceSession newSourceSession(SourceSessionParams params) { + return new SharedSourceSession(this, params); + } + + public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) { + return new SharedIntermediateSession(this, params); + } + + public SharedDestinationSession newDestinationSession(DestinationSessionParams params) { + return new SharedDestinationSession(this, params); + } + + public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) { + return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams)); + } + + private static Network newNetwork(RPCNetworkParams params) { + SlobroksConfig cfg = params.getSlobroksConfig(); + if (cfg == null) { + cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId()); + } + if (cfg.slobrok().isEmpty()) { + return new NullNetwork(); // for LocalApplication + } + return new RPCNetwork(params); + } +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java new file mode 100644 index 00000000000..2ce76a50b73 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java @@ -0,0 +1,60 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.shared; + +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.Result; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; + +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler { + + private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName()); + private final SharedMessageBus mbus; + private final SourceSession session; + private final ResourceReference mbusReference; + + public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) { + if (params.getReplyHandler() != null) { + throw new IllegalArgumentException("Reply handler must be null."); + } + this.mbus = mbus; + this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this)); + this.mbusReference = mbus.refer(); + } + + public SourceSession session() { + return session; + } + + @Override + public Result sendMessage(Message msg) { + return session.send(msg); + } + + public Result sendMessageBlocking(Message msg) throws InterruptedException { + return session.sendBlocking(msg); + } + + @Override + public void handleReply(Reply reply) { + reply.popHandler().handleReply(reply); + } + + @Override + protected void destroy() { + log.log(LogLevel.DEBUG, "Destroying shared source session."); + session.close(); + mbusReference.close(); + } + +} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java new file mode 100644 index 00000000000..3090154a9a6 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java @@ -0,0 +1,8 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Not a public API, exported for use in internal components. + */ +@ExportPackage +package com.yahoo.messagebus.shared; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java new file mode 100644 index 00000000000..266365ac747 --- /dev/null +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.messagebus.test; + +import com.yahoo.osgi.annotation.ExportPackage; |