diff options
author | gjoranv <gv@verizonmedia.com> | 2021-03-29 21:35:59 +0200 |
---|---|---|
committer | gjoranv <gv@verizonmedia.com> | 2021-03-29 21:37:08 +0200 |
commit | 8a2d6c1c4110ddf508f3a467eeb2ab031998591a (patch) | |
tree | b0590dc285312b39e13b84b65b72164ee244e9af /jdisc_messagebus_service | |
parent | 763b534e158b41224c6cce3e5fd063bcecd48dcb (diff) |
Remove jdisc_messagebus_service and messagebus-disc modules.
- They have been merged into container-messagebus
Diffstat (limited to 'jdisc_messagebus_service')
45 files changed, 0 insertions, 3976 deletions
diff --git a/jdisc_messagebus_service/.gitignore b/jdisc_messagebus_service/.gitignore deleted file mode 100644 index 12251442258..00000000000 --- a/jdisc_messagebus_service/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target -/pom.xml.build diff --git a/jdisc_messagebus_service/OWNERS b/jdisc_messagebus_service/OWNERS deleted file mode 100644 index 78b92e411b4..00000000000 --- a/jdisc_messagebus_service/OWNERS +++ /dev/null @@ -1,2 +0,0 @@ -gjoranv -bjorncs diff --git a/jdisc_messagebus_service/README.md b/jdisc_messagebus_service/README.md deleted file mode 100644 index cecb21de952..00000000000 --- a/jdisc_messagebus_service/README.md +++ /dev/null @@ -1,4 +0,0 @@ -<!-- Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> -# JDisc messagebus service - -Messagebus protocol implementation for JDisc. diff --git a/jdisc_messagebus_service/pom.xml b/jdisc_messagebus_service/pom.xml deleted file mode 100644 index 55f8392a5df..00000000000 --- a/jdisc_messagebus_service/pom.xml +++ /dev/null @@ -1,54 +0,0 @@ -<?xml version="1.0"?> -<!-- Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 - http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>com.yahoo.vespa</groupId> - <artifactId>parent</artifactId> - <version>7-SNAPSHOT</version> - <relativePath>../parent/pom.xml</relativePath> - </parent> - <artifactId>jdisc_messagebus_service</artifactId> - <version>7-SNAPSHOT</version> - <packaging>container-plugin</packaging> - <dependencies> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.yahoo.vespa</groupId> - <artifactId>jdisc_core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.yahoo.vespa</groupId> - <artifactId>component</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.yahoo.vespa</groupId> - <artifactId>messagebus</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>com.yahoo.vespa</groupId> - <artifactId>bundle-plugin</artifactId> - <extensions>true</extensions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - </plugin> - </plugins> - </build> -</project> diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java deleted file mode 100644 index c64fea8653b..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.handler.CompletionHandler; - -/** - * @author Simon Thoresen Hult - */ -enum IgnoredCompletionHandler implements CompletionHandler { - - INSTANCE; - - @Override - public void completed() { - - } - - @Override - public void failed(final Throwable t) { - - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java deleted file mode 100644 index 922e4140868..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.google.inject.Inject; -import com.yahoo.jdisc.AbstractResource; -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.ResourceReference; -import com.yahoo.jdisc.handler.ContentChannel; -import com.yahoo.jdisc.handler.RequestDeniedException; -import com.yahoo.jdisc.handler.ResponseHandler; -import com.yahoo.jdisc.service.ClientProvider; -import java.util.logging.Level; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.shared.ClientSession; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; - -/** - * @author Simon Thoresen Hult - */ -public final class MbusClient extends AbstractResource implements ClientProvider, ReplyHandler { - - private static final Logger log = Logger.getLogger(MbusClient.class.getName()); - private final BlockingQueue<MbusRequest> queue = new LinkedBlockingQueue<>(); - private final ClientSession session; - private final Thread thread = new Thread(new SenderTask(), "MbusClient"); - private volatile boolean done = false; - private final ResourceReference sessionReference; - - @Inject - public MbusClient(ClientSession session) { - this.session = session; - this.sessionReference = session.refer(); - } - - @Override - public void start() { - thread.start(); - } - - @Override - public ContentChannel handleRequest(Request request, ResponseHandler handler) { - if (!(request instanceof MbusRequest)) { - throw new RequestDeniedException(request); - } - final Message msg = ((MbusRequest)request).getMessage(); - msg.getTrace().trace(6, "Request received by MbusClient."); - msg.pushHandler(null); // save user context - final Long timeout = request.timeRemaining(TimeUnit.MILLISECONDS); - if (timeout != null) { - msg.setTimeReceivedNow(); - msg.setTimeRemaining(Math.max(1, timeout)); // negative or zero timeout has semantics - } - msg.setContext(handler); - msg.pushHandler(this); - sendBlocking((MbusRequest)request); - return null; - } - - @Override - public void handleTimeout(Request request, final ResponseHandler handler) { - // ignore, mbus has guaranteed reply - } - - @Override - protected void destroy() { - log.log(Level.FINE, "Destroying message bus client."); - sessionReference.close(); - done = true; - } - - @Override - public void handleReply(final Reply reply) { - reply.getTrace().trace(6, "Reply received by MbusClient."); - final ResponseHandler handler = (ResponseHandler)reply.getContext(); - reply.popHandler(); // restore user context - try { - handler.handleResponse(new MbusResponse(StatusCodes.fromMbusReply(reply), reply)) - .close(IgnoredCompletionHandler.INSTANCE); - } catch (final Exception e) { - log.log(Level.WARNING, "Ignoring exception thrown by ResponseHandler.", e); - } - } - - private void sendBlocking(MbusRequest request) { - while (!sendMessage(request)) { - try { - Thread.sleep(5); - } catch (final InterruptedException e) { - // ignore - } - } - } - - private boolean sendMessage(MbusRequest request) { - Error error; - final Long millis = request.timeRemaining(TimeUnit.MILLISECONDS); - if (millis != null && millis <= 0) { - error = new Error(ErrorCode.TIMEOUT, request.getTimeout(TimeUnit.MILLISECONDS) + " millis"); - } else if (request.isCancelled()) { - error = new Error(ErrorCode.APP_FATAL_ERROR, "request cancelled"); - } else { - try { - error = session.sendMessage(request.getMessage()).getError(); - } catch (final Exception e) { - error = new Error(ErrorCode.FATAL_ERROR, e.toString()); - } - } - if (error == null) { - return true; - } - if (error.isFatal()) { - final Reply reply = new EmptyReply(); - reply.swapState(request.getMessage()); - reply.addError(error); - reply.popHandler().handleReply(reply); - return true; - } - return false; - } - - private class SenderTask implements Runnable { - - @Override - public void run() { - while (!done) { - try { - final MbusRequest request = queue.poll(100, TimeUnit.MILLISECONDS); - if (request == null) { - continue; - } - sendBlocking(request); - } catch (final Exception e) { - log.log(Level.WARNING, "Ignoring exception thrown by MbusClient.", e); - } - } - } - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java deleted file mode 100644 index a0bedd678eb..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.service.CurrentContainer; -import com.yahoo.messagebus.Message; - -import java.net.URI; - -/** - * @author Simon Thoresen Hult - */ -public class MbusRequest extends Request { - - private final Message message; - - public MbusRequest(CurrentContainer current, URI uri, Message msg) { - super(current, uri); - this.message = validateMessage(msg); - } - - public MbusRequest(Request parent, URI uri, Message msg) { - super(parent, uri); - this.message = validateMessage(msg); - } - - public Message getMessage() { - return message; - } - - private Message validateMessage(Message msg) { - if (msg != null) { - return msg; - } - release(); - throw new NullPointerException(); - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java deleted file mode 100644 index fb5657a9215..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.handler.AbstractRequestHandler; -import com.yahoo.jdisc.handler.CompletionHandler; -import com.yahoo.jdisc.handler.ContentChannel; -import com.yahoo.jdisc.handler.ResponseHandler; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageHandler; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; - -/** - * @author Simon Thoresen Hult - */ -public abstract class MbusRequestHandler extends AbstractRequestHandler implements MessageHandler { - - @Override - public ContentChannel handleRequest(final Request request, final ResponseHandler handler) { - if (!(request instanceof MbusRequest)) { - throw new UnsupportedOperationException("Expected MbusRequest, got " + request.getClass().getName() + "."); - } - final Message msg = ((MbusRequest)request).getMessage(); - msg.pushHandler(new RespondingReplyHandler(handler)); - handleMessage(msg); - return null; - } - - private static class RespondingReplyHandler implements ReplyHandler { - - private final ResponseHandler handler; - - RespondingReplyHandler(final ResponseHandler handler) { - this.handler = handler; - } - - @Override - public void handleReply(final Reply reply) { - final MbusResponse response = new MbusResponse(StatusCodes.fromMbusReply(reply), reply); - handler.handleResponse(response).close(IgnoringCompletionHandler.INSTANCE); - } - } - - private static class IgnoringCompletionHandler implements CompletionHandler { - - public static final IgnoringCompletionHandler INSTANCE = new IgnoringCompletionHandler(); - - @Override - public void completed() { - - } - - @Override - public void failed(final Throwable t) { - - } - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java deleted file mode 100644 index 37da4d8569f..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.Response; -import com.yahoo.messagebus.Reply; - -/** - * @author Simon Thoresen Hult - */ -public class MbusResponse extends Response { - - private final Reply reply; - - public MbusResponse(int status, Reply reply) { - super(status); - if (reply == null) { - throw new NullPointerException(); - } - this.reply = reply; - } - - public Reply getReply() { - return reply; - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java deleted file mode 100644 index e26e1e7e134..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.google.inject.Inject; -import com.yahoo.jdisc.AbstractResource; -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.ResourceReference; -import com.yahoo.jdisc.Response; -import com.yahoo.jdisc.handler.ContentChannel; -import com.yahoo.jdisc.handler.ResponseHandler; -import com.yahoo.jdisc.service.CurrentContainer; -import com.yahoo.jdisc.service.ServerProvider; -import java.util.logging.Level; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.shared.ServerSession; - -import java.net.URI; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; - -/** - * @author Simon Thoresen Hult - */ -public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler { - - private final static Logger log = Logger.getLogger(MbusServer.class.getName()); - private final AtomicBoolean running = new AtomicBoolean(false); - private final CurrentContainer container; - private final ServerSession session; - private final URI uri; - private final ResourceReference sessionReference; - - @Inject - public MbusServer(CurrentContainer container, ServerSession session) { - this.container = container; - this.session = session; - uri = URI.create("mbus://localhost/" + session.name()); - session.setMessageHandler(this); - sessionReference = session.refer(); - } - - @Override - public void start() { - log.log(Level.FINE, "Starting message bus server."); - running.set(true); - } - - @Override - public void close() { - log.log(Level.FINE, "Closing message bus server."); - running.set(false); - } - - @Override - protected void destroy() { - log.log(Level.FINE, "Destroying message bus server."); - running.set(false); - sessionReference.close(); - } - - @Override - public void handleMessage(Message msg) { - if (!running.get()) { - dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed."); - return; - } - if (msg.getTrace().shouldTrace(6)) { - msg.getTrace().trace(6, "Message received by MbusServer."); - } - Request request = null; - ContentChannel content = null; - try { - request = new MbusRequest(container, uri, msg); - content = request.connect(new ServerResponseHandler(msg)); - } catch (RuntimeException e) { - dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString()); - } finally { - if (request != null) { - request.release(); - } - } - if (content != null) { - content.close(IgnoredCompletionHandler.INSTANCE); - } - } - - public String connectionSpec() { - return session.connectionSpec(); - } - - private void dispatchErrorReply(Message msg, int errCode, String errMsg) { - Reply reply = new EmptyReply(); - reply.swapState(msg); - reply.addError(new Error(errCode, errMsg)); - session.sendReply(reply); - } - - private class ServerResponseHandler implements ResponseHandler { - - final Message msg; - - ServerResponseHandler(Message msg) { - this.msg = msg; - } - - @Override - public ContentChannel handleResponse(Response response) { - Reply reply; - if (response instanceof MbusResponse) { - reply = ((MbusResponse)response).getReply(); - } else { - reply = new EmptyReply(); - reply.swapState(msg); - } - Error err = StatusCodes.toMbusError(response.getStatus()); - if (err != null) { - if (err.isFatal()) { - if (!reply.hasFatalErrors()) { - reply.addError(err); - } - } else { - if (!reply.hasErrors()) { - reply.addError(err); - } - } - } - if (reply.getTrace().shouldTrace(6)) { - reply.getTrace().trace(6, "Sending reply from MbusServer."); - } - session.sendReply(reply); - return null; - } - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java deleted file mode 100644 index 6570c910af3..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.Response; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Reply; - -/** - * @author Simon Thoresen Hult - */ -public class StatusCodes { - - public static int fromMbusReply(final Reply reply) { - int statusCode = Response.Status.OK; - for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { - statusCode = Math.max(statusCode, fromMbusError(reply.getError(i))); - } - return statusCode; - } - - public static int fromMbusError(final Error error) { - final int errorCode = error.getCode(); - if (errorCode < ErrorCode.TRANSIENT_ERROR) { - return Response.Status.OK; - } - if (errorCode < ErrorCode.FATAL_ERROR) { - return Response.Status.TEMPORARY_REDIRECT; - } - switch (errorCode) { - case ErrorCode.SEND_QUEUE_CLOSED: - return Response.Status.LOCKED; - case ErrorCode.ILLEGAL_ROUTE: - return Response.Status.BAD_REQUEST; - case ErrorCode.NO_SERVICES_FOR_ROUTE: - return Response.Status.NOT_FOUND; - case ErrorCode.ENCODE_ERROR: - return Response.Status.BAD_REQUEST; - case ErrorCode.NETWORK_ERROR: - return Response.Status.BAD_REQUEST; // got nothing better - case ErrorCode.UNKNOWN_PROTOCOL: - return Response.Status.UNSUPPORTED_MEDIA_TYPE; - case ErrorCode.DECODE_ERROR: - return Response.Status.UNSUPPORTED_MEDIA_TYPE; - case ErrorCode.TIMEOUT: - return Response.Status.REQUEST_TIMEOUT; - case ErrorCode.INCOMPATIBLE_VERSION: - return Response.Status.VERSION_NOT_SUPPORTED; - case ErrorCode.UNKNOWN_POLICY: - return Response.Status.BAD_REQUEST; - case ErrorCode.NETWORK_SHUTDOWN: - return Response.Status.LOCKED; - case ErrorCode.POLICY_ERROR: - return Response.Status.PRECONDITION_FAILED; - case ErrorCode.SEQUENCE_ERROR: - return Response.Status.PRECONDITION_FAILED; - case ErrorCode.APP_FATAL_ERROR: - return Response.Status.INTERNAL_SERVER_ERROR; - default: - return Response.Status.INTERNAL_SERVER_ERROR; - } - } - - public static Error toMbusError(final int statusCode) { - if (statusCode < 300) { - return null; - } else if (statusCode < 400) { - return new Error(ErrorCode.APP_TRANSIENT_ERROR, statusCode + " Redirection"); - } else if (statusCode < 500) { - return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Client Error"); - } else if (statusCode < 600) { - return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Server Error"); - } else { - return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Unknown Error"); - } - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java deleted file mode 100644 index 9aea8cf7db8..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -@ExportPackage -package com.yahoo.messagebus.jdisc; - -import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java deleted file mode 100644 index 111805d61b0..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc.test; - -import com.yahoo.jdisc.References; -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.ResourceReference; -import com.yahoo.jdisc.application.ContainerBuilder; -import com.yahoo.jdisc.handler.ResponseHandler; -import com.yahoo.jdisc.test.TestDriver; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.jdisc.MbusClient; -import com.yahoo.messagebus.jdisc.MbusRequest; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.messagebus.test.SimpleProtocol; - -import java.net.URI; -import java.util.concurrent.TimeUnit; - -/** - * @author Simon Thoresen Hult - */ -public class ClientTestDriver { - - private final RemoteServer server; - private final MbusClient client; - private final SharedSourceSession session; - private final TestDriver driver; - - private ClientTestDriver(RemoteServer server, Protocol protocol) { - this.server = server; - - MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); - RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId()); - SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); - session = mbus.newSourceSession(new SourceSessionParams()); - client = new MbusClient(session); - client.start(); - mbus.release(); - - driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); - ContainerBuilder builder = driver.newContainerBuilder(); - builder.clientBindings().bind("mbus://*/*", client); - driver.activateContainer(builder); - } - - public SourceSession sourceSession() { - return session.session(); - } - - public Request newServerRequest() { - return new Request(driver, URI.create("mbus://localhost/")); - } - - public Request newClientRequest(Message msg) { - msg.setRoute(Route.parse(server.connectionSpec())); - if (msg.getTrace().getLevel() == 0) { - msg.getTrace().setLevel(9); - } - final Request parent = newServerRequest(); - try (final ResourceReference ref = References.fromResource(parent)) { - return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg); - } - } - - public boolean sendRequest(Request request, ResponseHandler responseHandler) { - request.connect(responseHandler).close(null); - return true; - } - - public boolean sendMessage(Message msg, ResponseHandler responseHandler) { - final Request request = newClientRequest(msg); - try (final ResourceReference ref = References.fromResource(request)) { - return sendRequest(request, responseHandler); - } - } - - public Message awaitMessage() { - Message msg = null; - try { - msg = server.awaitMessage(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (msg != null) { - msg.getTrace().trace(0, "Message received by RemoteServer."); - } - return msg; - } - - public void sendReply(Reply reply) { - reply.getTrace().trace(0, "Sending reply from RemoteServer."); - server.sendReply(reply); - } - - public boolean awaitMessageAndSendReply(Reply reply) { - Message msg = awaitMessage(); - if (msg == null) { - return false; - } - reply.swapState(msg); - sendReply(reply); - return true; - } - - public boolean close() { - session.release(); - client.release(); - server.close(); - return driver.close(); - } - - public MbusClient client() { - return client; - } - - public RemoteServer server() { - return server; - } - - public static ClientTestDriver newInstance() { - return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol()); - } - - public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) { - return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol); - } - - public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) { - return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol()); - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java deleted file mode 100644 index c5287165e27..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc.test; - -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageHandler; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * @author Simon Thoresen Hult - */ -public class MessageQueue implements MessageHandler { - - private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); - - @Override - public void handleMessage(Message msg) { - queue.add(msg); - } - - public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException { - return queue.poll(timeout, unit); - } - -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java deleted file mode 100644 index 57d0abd980b..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc.test; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.network.local.LocalNetwork; -import com.yahoo.messagebus.network.rpc.RPCNetwork; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.test.SimpleProtocol; - -import java.util.concurrent.TimeUnit; - -/** - * @author Simon Thoresen Hult - */ -public class RemoteClient { - - private final Slobrok slobrok; - private final String slobrokId; - private final MessageBus mbus; - private final ReplyQueue queue = new ReplyQueue(); - private final SourceSession session; - - private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) { - this.slobrok = slobrok; - this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId; - mbus = network - ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)), - new MessageBusParams().addProtocol(protocol)) - : new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol)); - session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue)); - } - - public Result sendMessage(Message msg) { - return session.send(msg); - } - - public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException { - return queue.awaitReply(timeout, unit); - } - - public String slobrokId() { - return slobrokId; - } - - public void close() { - session.destroy(); - mbus.destroy(); - if (slobrok != null) { - slobrok.stop(); - } - } - - public static RemoteClient newInstanceWithInternSlobrok(boolean network) { - return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network); - } - - public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) { - return new RemoteClient(null, slobrokId, new SimpleProtocol(), network); - } - - public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) { - return new RemoteClient(newSlobrok(), null, protocol, network); - } - - private static Slobrok newSlobrok() { - Slobrok slobrok; - try { - slobrok = new Slobrok(); - } catch (ListenFailedException e) { - throw new IllegalStateException(e); - } - return slobrok; - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java deleted file mode 100644 index 1f0f82c4903..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc.test; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.network.Identity; -import com.yahoo.messagebus.network.rpc.RPCNetwork; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.test.SimpleProtocol; - -import java.util.concurrent.TimeUnit; - -/** - * @author Simon Thoresen Hult - */ -public class RemoteServer { - - private final Slobrok slobrok; - private final String slobrokId; - private final MessageBus mbus; - private final MessageQueue queue = new MessageQueue(); - private final DestinationSession session; - - private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) { - this.slobrok = slobrok; - this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId; - mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams() - .setSlobrokConfigId(this.slobrokId) - .setIdentity(new Identity(identity))), - new MessageBusParams().addProtocol(protocol)); - session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue)); - } - - public String connectionSpec() { - return session.getConnectionSpec(); - } - - public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException { - return queue.awaitMessage(timeout, unit); - } - - public void ackMessage(Message msg) { - session.acknowledge(msg); - } - - public void sendReply(Reply reply) { - session.reply(reply); - } - - public String slobrokId() { - return slobrokId; - } - - public void close() { - session.destroy(); - mbus.destroy(); - if (slobrok != null) { - slobrok.stop(); - } - } - - public static RemoteServer newInstanceWithInternSlobrok() { - return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote"); - } - - public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) { - return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote"); - } - - public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) { - return new RemoteServer(null, slobrokId, protocol, identity); - } - - public static RemoteServer newInstanceWithProtocol(Protocol protocol) { - return new RemoteServer(newSlobrok(), null, protocol, "remote"); - } - - private static Slobrok newSlobrok() { - try { - return new Slobrok(); - } catch (ListenFailedException e) { - throw new IllegalStateException(e); - } - } - -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java deleted file mode 100644 index 6c48aab5a7f..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc.test; - -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * @author Simon Thoresen Hult - */ -public class ReplyQueue implements ReplyHandler { - - private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>(); - - @Override - public void handleReply(Reply reply) { - queue.add(reply); - } - - public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException { - return queue.poll(timeout, unit); - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java deleted file mode 100644 index e59db28e886..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc.test; - -import com.google.inject.Module; -import com.yahoo.jdisc.application.ContainerBuilder; -import com.yahoo.jdisc.handler.RequestHandler; -import com.yahoo.jdisc.test.TestDriver; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.jdisc.MbusServer; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.shared.ServerSession; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.test.SimpleProtocol; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * @author Simon Thoresen Hult - */ -public class ServerTestDriver { - - private final RemoteClient client; - private final MbusServer server; - private final TestDriver driver; - - private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler, - Protocol protocol, Module... guiceModules) - { - this.client = client; - driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules); - if (activateContainer) { - ContainerBuilder builder = driver.newContainerBuilder(); - if (requestHandler != null) { - builder.serverBindings().bind("mbus://*/*", requestHandler); - } - driver.activateContainer(builder); - } - - MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); - RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId()); - SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); - ServerSession session = mbus.newDestinationSession(new DestinationSessionParams()); - server = new MbusServer(driver, session); - server.start(); - session.release(); - mbus.release(); - } - - public boolean sendMessage(Message msg) { - msg.setRoute(Route.parse(server.connectionSpec())); - msg.getTrace().setLevel(9); - return client.sendMessage(msg).isAccepted(); - } - - public Reply awaitReply() { - Reply reply = null; - try { - reply = client.awaitReply(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (reply != null) { - System.out.println(reply.getTrace()); - } - return reply; - } - - public Reply awaitSuccess() { - Reply reply = awaitReply(); - if (reply == null || reply.hasErrors()) { - return null; - } - return reply; - } - - public Reply awaitErrors(Integer... errCodes) { - Reply reply = awaitReply(); - if (reply == null) { - return null; - } - List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes)); - for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { - Error err = reply.getError(i); - System.out.println(err); - int idx = lst.indexOf(err.getCode()); - if (idx < 0) { - return null; - } - lst.remove(idx); - } - if (!lst.isEmpty()) { - return null; - } - return reply; - } - - public boolean close() { - server.close(); - server.release(); - client.close(); - return driver.close(); - } - - public TestDriver parent() { - return driver; - } - - public RemoteClient client() { - return client; - } - - public MbusServer server() { - return server; - } - - public static ServerTestDriver newInstance(RequestHandler requestHandler, boolean network, Module... guiceModules) { - return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, - new SimpleProtocol(), guiceModules); - } - - public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler, - boolean network, Module... guiceModules) - { - return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, protocol, - guiceModules); - } - - public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler, - boolean network, Module... guiceModules) - { - return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network), - true, requestHandler, new SimpleProtocol(), guiceModules); - } - - public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) { - return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null, - new SimpleProtocol(), guiceModules); - } - - public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, boolean network, Module... guiceModules) { - return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol, network), false, null, - protocol, guiceModules); - } - - public static ServerTestDriver newUnboundInstance(boolean network, Module... guiceModules) { - return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, null, - new SimpleProtocol(), guiceModules); - } - -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java deleted file mode 100644 index 72f563e8bd7..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -@ExportPackage -package com.yahoo.messagebus.network; - -import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java deleted file mode 100644 index 7b468813713..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -@ExportPackage -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java deleted file mode 100644 index 63b713e70e0..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -@ExportPackage -package com.yahoo.messagebus; - -import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java deleted file mode 100644 index ba8fc5fafba..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -@ExportPackage -package com.yahoo.messagebus.routing; - -import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java deleted file mode 100644 index 0964a254cf2..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jdisc.SharedResource; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Result; - -/** - * @author Simon Thoresen Hult - */ -public interface ClientSession extends SharedResource { - - public Result sendMessage(Message msg); -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java deleted file mode 100644 index ad58d6b9a5e..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jrt.slobrok.api.IMirror; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.network.Network; -import com.yahoo.messagebus.network.NetworkOwner; -import com.yahoo.messagebus.routing.RoutingNode; - -import java.util.List; - -/** - * <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p> - * - * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a> - */ -class NullNetwork implements Network { - - @Override - public boolean waitUntilReady(double seconds) { - return true; - } - - @Override - public void attach(NetworkOwner owner) { - - } - - @Override - public void registerSession(String session) { - - } - - @Override - public void unregisterSession(String session) { - - } - - @Override - public boolean allocServiceAddress(RoutingNode recipient) { - return false; - } - - @Override - public void freeServiceAddress(RoutingNode recipient) { - - } - - @Override - public void send(Message msg, List<RoutingNode> recipients) { - - } - - @Override - public void sync() { - - } - - @Override - public void shutdown() { - - } - - @Override - public String getConnectionSpec() { - return null; - } - - @Override - public IMirror getMirror() { - return null; - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java deleted file mode 100644 index 56713815c7a..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jdisc.SharedResource; -import com.yahoo.messagebus.MessageHandler; -import com.yahoo.messagebus.Reply; - -/** - * @author Simon Thoresen Hult - */ -public interface ServerSession extends SharedResource { - - public MessageHandler getMessageHandler(); - - public void setMessageHandler(MessageHandler msgHandler); - - public void sendReply(Reply reply); - - public String connectionSpec(); - - public String name(); -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java deleted file mode 100644 index 7da164757cd..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jdisc.AbstractResource; -import com.yahoo.jdisc.ResourceReference; -import java.util.logging.Level; -import com.yahoo.messagebus.DestinationSession; -import com.yahoo.messagebus.DestinationSessionParams; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageHandler; -import com.yahoo.messagebus.Reply; - -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; - -/** - * @author Simon Thoresen Hult - */ -public class SharedDestinationSession extends AbstractResource implements MessageHandler, ServerSession { - - private static final Logger log = Logger.getLogger(SharedDestinationSession.class.getName()); - private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); - private final DestinationSession session; - private final ResourceReference mbusReference; - - SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) { - this.msgHandler.set(params.getMessageHandler()); - this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this)); - this.mbusReference = mbus.refer(); - } - - public DestinationSession session() { - return session; - } - - @Override - public void sendReply(Reply reply) { - session.reply(reply); - } - - @Override - public MessageHandler getMessageHandler() { - return msgHandler.get(); - } - - @Override - public void setMessageHandler(MessageHandler msgHandler) { - if (!this.msgHandler.compareAndSet(null, msgHandler)) { - throw new IllegalStateException("Message handler already registered."); - } - } - - @Override - public void handleMessage(Message msg) { - MessageHandler msgHandler = this.msgHandler.get(); - if (msgHandler == null) { - Reply reply = new EmptyReply(); - reply.swapState(msg); - reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); - sendReply(reply); - return; - } - msgHandler.handleMessage(msg); - } - - @Override - public String connectionSpec() { - return session.getConnectionSpec(); - } - - @Override - public String name() { - return session.getName(); - } - - @Override - protected void destroy() { - log.log(Level.FINE, "Destroying shared destination session."); - session.destroy(); - mbusReference.close(); - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java deleted file mode 100644 index 5c9fab46e34..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jdisc.AbstractResource; -import com.yahoo.jdisc.ResourceReference; -import java.util.logging.Level; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.IntermediateSession; -import com.yahoo.messagebus.IntermediateSessionParams; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageHandler; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.Result; - -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; - -/** - * @author Simon Thoresen Hult - */ -public class SharedIntermediateSession extends AbstractResource - implements ClientSession, ServerSession, MessageHandler, ReplyHandler -{ - - private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName()); - private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>(); - private final IntermediateSession session; - private final ResourceReference mbusReference; - - public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) { - if (params.getReplyHandler() != null) { - throw new IllegalArgumentException("Reply handler must be null."); - } - this.msgHandler.set(params.getMessageHandler()); - this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this) - .setMessageHandler(this)); - this.mbusReference = mbus.refer(); - } - - public IntermediateSession session() { - return session; - } - - @Override - public Result sendMessage(Message msg) { - session.forward(msg); - return Result.ACCEPTED; - } - - @Override - public void sendReply(Reply reply) { - session.forward(reply); - } - - @Override - public MessageHandler getMessageHandler() { - return msgHandler.get(); - } - - @Override - public void setMessageHandler(MessageHandler msgHandler) { - if (!this.msgHandler.compareAndSet(null, msgHandler)) { - throw new IllegalStateException("Message handler already registered."); - } - } - - @Override - public void handleMessage(Message msg) { - MessageHandler msgHandler = this.msgHandler.get(); - if (msgHandler == null) { - Reply reply = new EmptyReply(); - reply.swapState(msg); - reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet.")); - sendReply(reply); - return; - } - msgHandler.handleMessage(msg); - } - - @Override - public void handleReply(Reply reply) { - reply.popHandler().handleReply(reply); - } - - @Override - public String connectionSpec() { - return session.getConnectionSpec(); - } - - @Override - public String name() { - return session.getName(); - } - - @Override - protected void destroy() { - log.log(Level.FINE, "Destroying shared intermediate session."); - session.destroy(); - mbusReference.close(); - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java deleted file mode 100644 index dd135a51378..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.config.subscription.ConfigGetter; -import com.yahoo.jdisc.AbstractResource; -import java.util.logging.Level; -import com.yahoo.messagebus.DestinationSessionParams; -import com.yahoo.messagebus.IntermediateSessionParams; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.network.Network; -import com.yahoo.messagebus.network.rpc.RPCNetwork; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.cloud.config.SlobroksConfig; - -import java.util.logging.Logger; - -/** - * @author Simon Thoresen Hult - */ -public class SharedMessageBus extends AbstractResource { - - private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName()); - private final MessageBus mbus; - - public SharedMessageBus(MessageBus mbus) { - mbus.getClass(); // throws NullPointerException - this.mbus = mbus; - } - - public MessageBus messageBus() { - return mbus; - } - - @Override - protected void destroy() { - log.log(Level.FINE, "Destroying shared message bus."); - mbus.destroy(); - } - - public SharedSourceSession newSourceSession(SourceSessionParams params) { - return new SharedSourceSession(this, params); - } - - public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) { - return new SharedIntermediateSession(this, params); - } - - public SharedDestinationSession newDestinationSession(DestinationSessionParams params) { - return new SharedDestinationSession(this, params); - } - - public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) { - return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams)); - } - - private static Network newNetwork(RPCNetworkParams params) { - SlobroksConfig cfg = params.getSlobroksConfig(); - if (cfg == null) { - cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId()); - } - if (cfg.slobrok().isEmpty()) { - return new NullNetwork(); // for LocalApplication - } - return new RPCNetwork(params); - } -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java deleted file mode 100644 index 56071682349..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jdisc.AbstractResource; -import com.yahoo.jdisc.ResourceReference; -import java.util.logging.Level; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.Result; -import com.yahoo.messagebus.SourceSession; -import com.yahoo.messagebus.SourceSessionParams; - -import java.util.logging.Logger; - -/** - * @author Simon Thoresen Hult - */ -public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler { - - private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName()); - private final SourceSession session; - private final ResourceReference mbusReference; - - public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) { - if (params.getReplyHandler() != null) { - throw new IllegalArgumentException("Reply handler must be null."); - } - this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this)); - this.mbusReference = mbus.refer(); - } - - public SourceSession session() { - return session; - } - - @Override - public Result sendMessage(Message msg) { - return session.send(msg); - } - - public Result sendMessageBlocking(Message msg) throws InterruptedException { - return session.sendBlocking(msg); - } - - @Override - public void handleReply(Reply reply) { - reply.popHandler().handleReply(reply); - } - - @Override - protected void destroy() { - log.log(Level.FINE, "Destroying shared source session."); - session.close(); - mbusReference.close(); - } - -} diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java deleted file mode 100644 index 941a0dc4c5c..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * Not a public API, exported for use in internal components. - */ -@ExportPackage -package com.yahoo.messagebus.shared; - -import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java deleted file mode 100644 index 42bc03b6e17..00000000000 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -@ExportPackage -package com.yahoo.messagebus.test; - -import com.yahoo.osgi.annotation.ExportPackage; diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java deleted file mode 100644 index 62a9a864781..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.application.ContainerBuilder; -import com.yahoo.jdisc.handler.FutureResponse; -import com.yahoo.jdisc.handler.ResponseHandler; -import com.yahoo.jdisc.test.TestDriver; -import com.yahoo.messagebus.DestinationSession; -import com.yahoo.messagebus.DestinationSessionParams; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.MessageHandler; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.network.local.LocalNetwork; -import com.yahoo.messagebus.network.local.LocalWire; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleProtocol; -import org.junit.Ignore; -import org.junit.Test; - -import java.net.URI; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - -/** - * @author Simon Thoresen Hult - */ -public class ClientThreadingTestCase { - - private static final int NUM_THREADS = 32; - private static final int NUM_REQUESTS = 1000; - - @Test - @Ignore - public void requireThatClientIsThreadSafe() throws Exception { - final LocalWire wire = new LocalWire(); - final Client client = new Client(wire); - final Server server = new Server(wire); - - final List<Callable<Boolean>> lst = new LinkedList<>(); - final Route route = Route.parse(server.session.getConnectionSpec()); - for (int i = 0; i < NUM_THREADS; ++i) { - lst.add(new RequestTask(client, route)); - } - final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); - for (final Future<Boolean> res : executor.invokeAll(lst, 60, TimeUnit.SECONDS)) { - assertThat(res.get(), is(true)); - } - - assertThat(client.close(), is(true)); - assertThat(server.close(), is(true)); - } - - private static final class RequestTask implements Callable<Boolean> { - - final Client client; - final Route route; - - RequestTask(final Client client, final Route route) { - this.client = client; - this.route = route; - } - - @Override - public Boolean call() throws Exception { - for (int i = 0; i < NUM_REQUESTS; ++i) { - final FutureResponse responseHandler = new FutureResponse(); - client.send(new SimpleMessage("foo").setRoute(route), responseHandler); - responseHandler.get(60, TimeUnit.SECONDS); - } - return true; - } - } - - private static class Client { - - final MbusClient delegate; - final TestDriver driver; - - Client(final LocalWire wire) { - driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); - delegate = newMbusClient(wire); - - final ContainerBuilder builder = driver.newContainerBuilder(); - builder.clientBindings().bind("mbus://*/*", delegate); - driver.activateContainer(builder); - delegate.start(); - } - - void send(final Message msg, final ResponseHandler handler) { - final MbusRequest request = new MbusRequest(driver, URI.create("mbus://remote/"), msg); - request.setServerRequest(false); - request.connect(handler).close(null); - request.release(); - } - - boolean close() { - delegate.release(); - return driver.close(); - } - } - - private static class Server implements MessageHandler { - - final MessageBus mbus; - final DestinationSession session; - - Server(final LocalWire wire) { - mbus = new MessageBus( - new LocalNetwork(wire), - new MessageBusParams().addProtocol(new SimpleProtocol())); - session = mbus.createDestinationSession( - new DestinationSessionParams().setMessageHandler(this)); - } - - @Override - public void handleMessage(final Message msg) { - session.acknowledge(msg); - } - - boolean close() { - return session.destroy() && mbus.destroy(); - } - } - - private static MbusClient newMbusClient(final LocalWire wire) { - final SharedMessageBus mbus = new SharedMessageBus(new MessageBus( - new LocalNetwork(wire), - new MessageBusParams().addProtocol(new SimpleProtocol()))); - final SharedSourceSession session = mbus.newSourceSession( - new SourceSessionParams()); - final MbusClient client = new MbusClient(session); - session.release(); - mbus.release(); - return client; - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java deleted file mode 100644 index 9cfd1fd02b9..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java +++ /dev/null @@ -1,345 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.ResourceReference; -import com.yahoo.jdisc.Response; -import com.yahoo.jdisc.handler.CompletionHandler; -import com.yahoo.jdisc.handler.ContentChannel; -import com.yahoo.jdisc.handler.RequestDeniedException; -import com.yahoo.jdisc.handler.ResponseHandler; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.jdisc.test.ClientTestDriver; -import com.yahoo.messagebus.shared.ClientSession; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleReply; -import org.junit.Test; - -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; - -/** - * @author Simon Thoresen Hult - */ -public class MbusClientTestCase { - - @Test - public void requireThatClientRetainsSession() { - MySession session = new MySession(); - assertEquals(1, session.refCount); - MbusClient client = new MbusClient(session); - assertEquals(2, session.refCount); - session.release(); - assertEquals(1, session.refCount); - client.destroy(); - assertEquals(0, session.refCount); - } - - @Test - public void requireThatRequestResponseWorks() { - ClientTestDriver driver = ClientTestDriver.newInstance(); - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); - assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); - - Response response = responseHandler.awaitResponse(); - assertNotNull(response); - assertEquals(Response.Status.OK, response.getStatus()); - assertTrue(driver.close()); - } - - @Test - public void requireThatNonMbusRequestIsDenied() throws InterruptedException { - ClientTestDriver driver = ClientTestDriver.newInstance(); - Request serverReq = null; - Request clientReq = null; - try { - serverReq = driver.newServerRequest(); - clientReq = new Request(serverReq, URI.create("mbus://host/path")); - clientReq.connect(MyResponseHandler.newInstance()); - fail(); - } catch (RequestDeniedException e) { - System.out.println(e.getMessage()); - } finally { - if (serverReq != null) { - serverReq.release(); - } - if (clientReq != null) { - clientReq.release(); - } - } - assertTrue(driver.close()); - } - - @Test - public void requireThatRequestContentDoesNotSupportWrite() throws InterruptedException { - ClientTestDriver driver = ClientTestDriver.newInstance(); - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - - Request request = null; - ContentChannel content; - try { - request = driver.newClientRequest(new SimpleMessage("foo")); - content = request.connect(responseHandler); - } finally { - if (request != null) { - request.release(); - } - } - try { - content.write(ByteBuffer.allocate(69), null); - fail(); - } catch (UnsupportedOperationException e) { - - } - content.close(null); - - assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); - assertNotNull(responseHandler.awaitResponse()); - assertTrue(driver.close()); - } - - @Test - public void requireThatResponseIsMbus() { - ClientTestDriver driver = ClientTestDriver.newInstance(); - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); - assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); - - Response response = responseHandler.awaitResponse(); - assertTrue(response instanceof MbusResponse); - Reply reply = ((MbusResponse)response).getReply(); - assertTrue(reply instanceof EmptyReply); - assertTrue(driver.close()); - } - - @Test - public void requireThatServerReceivesGivenMessage() { - ClientTestDriver driver = ClientTestDriver.newInstance(); - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); - - Message msg = driver.awaitMessage(); - assertTrue(msg instanceof SimpleMessage); - assertEquals("foo", ((SimpleMessage)msg).getValue()); - - Reply reply = new EmptyReply(); - reply.swapState(msg); - driver.sendReply(reply); - - assertNotNull(responseHandler.awaitResponse()); - assertTrue(driver.close()); - } - - @Test - public void requireThatClientReceivesGivenReply() { - ClientTestDriver driver = ClientTestDriver.newInstance(); - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); - - Message msg = driver.awaitMessage(); // TODO: Timing sensitive - assertNotNull(msg); - Reply reply = new SimpleReply("bar"); - reply.swapState(msg); - driver.sendReply(reply); - - Response response = responseHandler.awaitResponse(); - assertTrue(response instanceof MbusResponse); - reply = ((MbusResponse)response).getReply(); - assertTrue(reply instanceof SimpleReply); - assertEquals("bar", ((SimpleReply)reply).getValue()); - assertTrue(driver.close()); - } - - @Test - public void requireThatStateIsTransferredToResponse() { - ClientTestDriver driver = ClientTestDriver.newInstance(); - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - - Message msg = new SimpleMessage("foo"); - Object pushedCtx = new Object(); - msg.setContext(pushedCtx); - ReplyHandler pushedHandler = new MyReplyHandler(); - msg.pushHandler(pushedHandler); - Object currentCtx = new Object(); - msg.setContext(currentCtx); - msg.getTrace().setLevel(6); - assertTrue(driver.sendMessage(msg, responseHandler)); - assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); - - Response response = responseHandler.awaitResponse(); - assertTrue(response.getClass().getName(), response instanceof MbusResponse); - Reply reply = ((MbusResponse)response).getReply(); - assertSame(currentCtx, reply.getContext()); - assertEquals(6, reply.getTrace().getLevel()); - assertSame(pushedHandler, reply.popHandler()); - assertSame(pushedCtx, reply.getContext()); - assertTrue(driver.close()); - } - - @Test - public void requireThatStateIsTransferredToSyncMbusSendFailureResponse() { - ClientTestDriver driver = ClientTestDriver.newInstance(); - driver.sourceSession().close(); - - Message msg = new SimpleMessage("foo"); - ReplyHandler pushedHandler = new MyReplyHandler(); - Object pushedCtx = new Object(); - msg.setContext(pushedCtx); - msg.pushHandler(pushedHandler); - Object currentCtx = new Object(); - msg.setContext(currentCtx); - msg.getTrace().setLevel(6); - - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - driver.sendMessage(msg, responseHandler); - - Response response = responseHandler.awaitResponse(); - assertNotNull(response); - assertTrue(response.getClass().getName(), response instanceof MbusResponse); - Reply reply = ((MbusResponse)response).getReply(); - assertSame(currentCtx, reply.getContext()); - assertEquals(6, reply.getTrace().getLevel()); - assertSame(pushedHandler, reply.popHandler()); - assertSame(pushedCtx, reply.getContext()); - assertTrue(driver.close()); - } - - @Test - public void requireThatStateIsTransferredToTimeoutResponse() throws InterruptedException { - ClientTestDriver driver = ClientTestDriver.newInstance(); - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - - Message msg = new SimpleMessage("foo"); - ReplyHandler pushedHandler = new MyReplyHandler(); - Object pushedCtx = new Object(); - msg.setContext(pushedCtx); - msg.pushHandler(pushedHandler); - Object currentCtx = new Object(); - msg.setContext(currentCtx); - msg.getTrace().setLevel(6); - - Request request = driver.newClientRequest(msg); - request.setTimeout(1, TimeUnit.MILLISECONDS); - assertTrue(driver.sendRequest(request, responseHandler)); - request.release(); - - Response response = responseHandler.awaitResponse(); - assertNotNull(response); - assertTrue(response.getClass().getName(), response instanceof MbusResponse); - Reply reply = ((MbusResponse)response).getReply(); - assertSame(currentCtx, reply.getContext()); - assertEquals(6, reply.getTrace().getLevel()); - assertSame(pushedHandler, reply.popHandler()); - assertSame(pushedCtx, reply.getContext()); - assertTrue(driver.close()); - } - - @Test - public void requireThatSyncMbusSendFailureRespondsWithError() { - ClientTestDriver driver = ClientTestDriver.newInstance(); - driver.sourceSession().close(); - - MyResponseHandler responseHandler = MyResponseHandler.newInstance(); - driver.sendMessage(new SimpleMessage("foo"), responseHandler); - Response response = responseHandler.awaitResponse(); - assertNotNull(response); - assertTrue(response.getClass().getName(), response instanceof MbusResponse); - Reply reply = ((MbusResponse)response).getReply(); - assertEquals(1, reply.getNumErrors()); - assertEquals(ErrorCode.SEND_QUEUE_CLOSED, reply.getError(0).getCode()); - assertTrue(driver.close()); - } - - private static class MyResponseHandler implements ResponseHandler { - - final MyResponseContent content; - Response response; - - MyResponseHandler(MyResponseContent content) { - this.content = content; - } - - Response awaitResponse() { - try { - content.closeLatch.await(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - if (response instanceof MbusResponse) { - //System.out.println(((MbusResponse)response).getReply().getTrace()); - } - return response; - } - - @Override - public ContentChannel handleResponse(Response response) { - this.response = response; - return content; - } - - static MyResponseHandler newInstance() { - return new MyResponseHandler(new MyResponseContent()); - } - } - - private static class MyResponseContent implements ContentChannel { - - final CountDownLatch writeLatch = new CountDownLatch(1); - final CountDownLatch closeLatch = new CountDownLatch(1); - - @Override - public void write(ByteBuffer buf, CompletionHandler handler) { - if (handler != null) { - handler.completed(); - } - writeLatch.countDown(); - } - - @Override - public void close(CompletionHandler handler) { - if (handler != null) { - handler.completed(); - } - closeLatch.countDown(); - } - } - - private static class MySession implements ClientSession { - - int refCount = 1; - - @Override - public Result sendMessage(Message msg) { - return null; - } - - @Override - public ResourceReference refer() { - ++refCount; - return new ResourceReference() { - @Override - public void close() { - --refCount; - } - }; - } - - @Override - public void release() { - --refCount; - } - } - - private static class MyReplyHandler implements ReplyHandler { - - @Override - public void handleReply(Reply reply) { - - } - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java deleted file mode 100644 index 316ad18bae9..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.google.common.util.concurrent.ListenableFuture; -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.Response; -import com.yahoo.jdisc.application.ContainerBuilder; -import com.yahoo.jdisc.handler.RequestDispatch; -import com.yahoo.jdisc.test.TestDriver; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.test.SimpleMessage; -import org.junit.Test; - -import java.net.URI; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * @author Simon Thoresen Hult - */ -public class MbusRequestHandlerTestCase { - - @Test - public void requireThatNonMbusRequestThrows() throws Exception { - final TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE); - try { - new RequestDispatch() { - - @Override - protected Request newRequest() { - return new Request(driver, URI.create("mbus://localhost/")); - } - }.connect(); - fail(); - } catch (UnsupportedOperationException e) { - assertEquals("Expected MbusRequest, got com.yahoo.jdisc.Request.", e.getMessage()); - } - assertTrue(driver.close()); - } - - @Test - public void requireThatHandlerCanRespondInSameThread() throws Exception { - TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE); - - Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS); - assertTrue(response instanceof MbusResponse); - assertEquals(Response.Status.OK, response.getStatus()); - Reply reply = ((MbusResponse)response).getReply(); - assertTrue(reply instanceof EmptyReply); - assertFalse(reply.hasErrors()); - - assertTrue(driver.close()); - } - - @Test - public void requireThatHandlerCanRespondInOtherThread() throws Exception { - TestDriver driver = newTestDriver(ThreadedReplier.INSTANCE); - - Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS); - assertTrue(response instanceof MbusResponse); - assertEquals(Response.Status.OK, response.getStatus()); - Reply reply = ((MbusResponse)response).getReply(); - assertTrue(reply instanceof EmptyReply); - assertFalse(reply.hasErrors()); - - assertTrue(driver.close()); - } - - private static TestDriver newTestDriver(MbusRequestHandler handler) { - TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); - ContainerBuilder builder = driver.newContainerBuilder(); - builder.serverBindings().bind("mbus://*/*", handler); - driver.activateContainer(builder); - return driver; - } - - private static ListenableFuture<Response> dispatchMessage(final TestDriver driver, final Message msg) { - return new RequestDispatch() { - - @Override - protected Request newRequest() { - return new MbusRequest(driver, URI.create("mbus://localhost/"), msg); - } - }.dispatch(); - } - - private static class SameThreadReplier extends MbusRequestHandler { - - final static SameThreadReplier INSTANCE = new SameThreadReplier(); - - @Override - public void handleMessage(Message msg) { - Reply reply = new EmptyReply(); - reply.swapState(msg); - reply.popHandler().handleReply(reply); - } - } - - private static class ThreadedReplier extends MbusRequestHandler { - - final static ThreadedReplier INSTANCE = new ThreadedReplier(); - - @Override - public void handleMessage(final Message msg) { - Executors.newSingleThreadExecutor().execute(new Runnable() { - - @Override - public void run() { - SameThreadReplier.INSTANCE.handleMessage(msg); - } - }); - } - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java deleted file mode 100644 index c68ab4e6742..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.test.TestDriver; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.text.Utf8String; -import org.junit.Test; - -import java.net.URI; - -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * @author Simon Thoresen Hult - */ -public class MbusRequestTestCase { - - @Test - public void requireThatAccessorsWork() { - TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); - driver.activateContainer(driver.newContainerBuilder()); - - MyMessage msg = new MyMessage(); - MbusRequest request = new MbusRequest(driver, URI.create("mbus://host/path"), msg); - assertSame(msg, request.getMessage()); - request.release(); - driver.close(); - } - - @Test - public void requireThatMessageCanNotBeNullInRootRequest() { - TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); - driver.activateContainer(driver.newContainerBuilder()); - try { - new MbusRequest(driver, URI.create("mbus://host/path"), null); - fail(); - } catch (NullPointerException e) { - // expected - } - assertTrue(driver.close()); - } - - @Test - public void requireThatMessageCanNotBeNullInChildRequest() { - TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); - driver.activateContainer(driver.newContainerBuilder()); - MbusRequest parent = new MbusRequest(driver, URI.create("mbus://host/path"), new SimpleMessage("foo")); - try { - new MbusRequest(parent, URI.create("mbus://host/path"), null); - fail(); - } catch (NullPointerException e) { - // expected - } - parent.release(); - assertTrue(driver.close()); - } - - private class MyMessage extends Message { - - @Override - public Utf8String getProtocol() { - return null; - } - - @Override - public int getType() { - return 0; - } - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java deleted file mode 100644 index eb4cb949770..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.Response; -import com.yahoo.messagebus.Reply; -import com.yahoo.text.Utf8String; -import org.junit.Test; - -import static org.junit.Assert.assertSame; -import static org.junit.Assert.fail; - -/** - * @author Simon Thoresen Hult - */ -public class MbusResponseTestCase { - - @Test - public void requireThatAccessorsWork() { - MyReply reply = new MyReply(); - MbusResponse response = new MbusResponse(Response.Status.OK, reply); - assertSame(reply, response.getReply()); - } - - @Test - public void requireThatReplyCanNotBeNull() { - try { - new MbusResponse(Response.Status.OK, null); - fail(); - } catch (NullPointerException e) { - - } - } - - private class MyReply extends Reply { - - @Override - public Utf8String getProtocol() { - return null; - } - - @Override - public int getType() { - return 0; - } - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java deleted file mode 100644 index bf89f3869ed..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java +++ /dev/null @@ -1,694 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.google.inject.AbstractModule; -import com.google.inject.Module; -import com.yahoo.jdisc.test.ServerProviderConformanceTest; -import com.yahoo.messagebus.DestinationSessionParams; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.SourceSession; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.network.local.LocalNetwork; -import com.yahoo.messagebus.network.local.LocalWire; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.shared.ServerSession; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleProtocol; -import org.hamcrest.Matcher; -import org.junit.Ignore; -import org.junit.Test; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - -import static com.yahoo.messagebus.ErrorCode.APP_FATAL_ERROR; -import static com.yahoo.messagebus.ErrorCode.SESSION_BUSY; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * @author Simon Thoresen Hult - */ -public class MbusServerConformanceTest extends ServerProviderConformanceTest { - - /* Many of the "success" expectations here (may) seem odd. But this is the current behavior of the - * messagebus server. We should probably look into whether the behavior is correct in all cases. - */ - - @Override - @Test - public void testContainerNotReadyException() throws Throwable { - new TestRunner().setRequestTimeout(100, TimeUnit.MILLISECONDS) - .expectError(is(SESSION_BUSY)) - .executeAndClose(); - } - - @Override - @Test - public void testBindingSetNotFoundException() throws Throwable { - new TestRunner().expectError(is(APP_FATAL_ERROR)) - .executeAndClose(); - } - - @Override - @Test - public void testNoBindingSetSelectedException() throws Throwable { - new TestRunner().expectError(is(APP_FATAL_ERROR)) - .executeAndClose(); - } - - @Override - @Test - public void testBindingNotFoundException() throws Throwable { - new TestRunner().expectError(is(APP_FATAL_ERROR)) - .executeAndClose(); - } - - @Override - @Test - public void testRequestHandlerWithSyncCloseResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestHandlerWithSyncWriteResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestHandlerWithSyncHandleResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestHandlerWithAsyncHandleResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestException() throws Throwable { - new TestRunner().expectError(is(APP_FATAL_ERROR)) - .executeAndClose(); - } - - @Override - @Test - public void testRequestExceptionWithSyncCloseResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestExceptionWithSyncWriteResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestNondeterministicExceptionWithSyncHandleResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestExceptionAfterResponseWriteWithSyncHandleResponse() throws Throwable { - } - - @Override - @Test - public void testRequestNondeterministicExceptionWithAsyncHandleResponse() throws Throwable { - new TestRunner().executeAndClose(); - } - - @Override - @Test - public void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse() throws Throwable { - new TestRunner().expectError(is(APP_FATAL_ERROR)) - .executeAndClose(); - } - - @Override - @Test - public void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithSyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithAsyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithNondeterministicSyncFailure() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithSyncFailureBeforeResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithSyncFailureAfterResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithNondeterministicAsyncFailure() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithAsyncFailureBeforeResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithAsyncFailureAfterResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteNondeterministicException() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionBeforeResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionAfterResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionAfterResponseCloseNoContent() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteNondeterministicExceptionWithSyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionWithNondeterministicSyncFailure() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionWithNondeterministicAsyncFailure() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite() throws Throwable { - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { - } - - @Override - @Test - public void testRequestContentCloseWithSyncCompletion() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseWithAsyncCompletion() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseWithNondeterministicSyncFailure() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseWithSyncFailureBeforeResponseWrite() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentCloseWithSyncFailureAfterResponseWrite() throws Throwable { - } - - @Override - @Test - public void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseWithNondeterministicAsyncFailure() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseWithAsyncFailureBeforeResponseWrite() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentCloseWithAsyncFailureAfterResponseWrite() throws Throwable { - } - - @Override - @Test - public void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseNondeterministicException() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseExceptionBeforeResponseWrite() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentCloseExceptionAfterResponseWrite() throws Throwable { - } - - @Override - @Test - public void testRequestContentCloseExceptionAfterResponseCloseNoContent() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseNondeterministicExceptionWithSyncCompletion() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion() throws Throwable { - } - - @Override - @Test - public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable { - } - - @Override - @Test - public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseNondeterministicExceptionWithSyncFailure() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure() throws Throwable { - } - - @Override - @Test - public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseNondeterministicExceptionWithAsyncFailure() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure() throws Throwable { - } - - @Override - @Test - public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - @Ignore // N/A: The messagebus protocol does not have content. - public void testResponseWriteCompletionException() throws Throwable { - } - - @Override - @Test - public void testResponseCloseCompletionException() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - @Override - @Test - public void testResponseCloseCompletionExceptionNoContent() throws Throwable { - new TestRunner().expectSuccess() - .executeAndClose(); - } - - private class TestRunner implements Adapter<MbusServer, MyClient, Reply> { - - final LocalWire wire = new LocalWire(); - final SharedMessageBus mbus; - final ServerSession session; - Matcher<Integer> expectedError = null; - boolean successExpected = false; - long timeoutMillis = TimeUnit.SECONDS.toMillis(60); - - TestRunner() { - this(new MessageBusParams().addProtocol(new SimpleProtocol()), - new DestinationSessionParams()); - } - - TestRunner(MessageBusParams mbusParams, DestinationSessionParams sessionParams) { - this.mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(wire), mbusParams)); - this.session = mbus.newDestinationSession(sessionParams); - } - - TestRunner setRequestTimeout(long timeout, TimeUnit unit) { - timeoutMillis = unit.toMillis(timeout); - return this; - } - - TestRunner expectError(Matcher<Integer> matcher) { - assertThat(successExpected, is(false)); - expectedError = matcher; - return this; - } - - TestRunner expectSuccess() { - assertThat(expectedError, is(nullValue())); - successExpected = true; - return this; - } - - @Override - public Module newConfigModule() { - return new AbstractModule() { - - @Override - protected void configure() { - bind(ServerSession.class).toInstance(session); - } - }; - } - - @Override - public Class<MbusServer> getServerProviderClass() { - return MbusServer.class; - } - - @Override - public MyClient newClient(MbusServer server) throws Throwable { - return new MyClient(wire, server.connectionSpec()); - } - - @Override - public Reply executeRequest(MyClient client, boolean withRequestContent) throws Throwable { - // This protocol doesn't have the concept of "request content", so if we are asked to send any, it's a bug. - assertThat(withRequestContent, is(false)); - - final SimpleMessage msg = new SimpleMessage("foo"); - msg.getTrace().setLevel(9); - msg.setRoute(client.route); - msg.setTimeRemaining(timeoutMillis); - assertThat("client.session.send(msg).isAccepted()", - client.session.send(msg).isAccepted(), is(true)); - - final Reply reply = client.replies.poll(60, TimeUnit.SECONDS); - assertThat("reply != null", reply, notNullValue()); - return reply; - } - - @Override - public Iterable<ByteBuffer> newResponseContent() { - return Collections.emptyList(); - } - - @Override - public void validateResponse(Reply reply) throws Throwable { - final String trace = String.valueOf(reply.getTrace()); - if (expectedError != null) { - assertThat(reply.hasErrors(), is(true)); - final int error = reply.getError(0).getCode(); - assertThat(trace, error, expectedError); - } - if (successExpected) { - assertThat(trace, reply.hasErrors(), is(false)); - } - } - - void executeAndClose() throws Throwable { - runTest(this); - session.release(); - mbus.release(); - } - } - - public static class MyClient implements Closeable, ReplyHandler { - - final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); - final MessageBus mbus; - final Route route; - final SourceSession session; - - MyClient(LocalWire wire, String connectionSpec) { - this.mbus = new MessageBus(new LocalNetwork(wire), - new MessageBusParams().addProtocol(new SimpleProtocol())); - this.session = mbus.createSourceSession(new SourceSessionParams().setReplyHandler(this)); - this.route = Route.parse(connectionSpec); - } - - @Override - public void close() throws IOException { - session.destroy(); - mbus.destroy(); - } - - @Override - public void handleReply(Reply reply) { - replies.addLast(reply); - } - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java deleted file mode 100644 index 9d45d2e7abf..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java +++ /dev/null @@ -1,374 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.google.inject.AbstractModule; -import com.yahoo.jdisc.Request; -import com.yahoo.jdisc.ResourceReference; -import com.yahoo.jdisc.Response; -import com.yahoo.jdisc.application.BindingSetSelector; -import com.yahoo.jdisc.handler.*; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.jdisc.test.ServerTestDriver; -import com.yahoo.messagebus.shared.ServerSession; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleReply; -import org.junit.Test; - -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; - -/** - * @author Simon Thoresen Hult - */ -public class MbusServerTestCase { - - @Test - public void requireThatServerRetainsSession() { - MySession session = new MySession(); - assertEquals(1, session.refCount); - MbusServer server = new MbusServer(null, session); - assertEquals(2, session.refCount); - session.release(); - assertEquals(1, session.refCount); - server.destroy(); - assertEquals(0, session.refCount); - } - - @Test - public void requireThatNoBindingSetSelectedExceptionIsCaught() { - ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector(null)); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); - assertTrue(driver.close()); - } - - @Test - public void requireThatBindingSetNotFoundExceptionIsCaught() { - ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector("foo")); - assertTrue(driver.sendMessage(new SimpleMessage("bar"))); - assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); - assertTrue(driver.close()); - } - - @Test - public void requireThatContainerNotReadyExceptionIsCaught() { - ServerTestDriver driver = ServerTestDriver.newInactiveInstance(true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); - assertTrue(driver.close()); - } - - @Test - public void requireThatBindingNotFoundExceptionIsCaught() { - ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); - assertTrue(driver.close()); - } - - @Test - public void requireThatRequestDeniedExceptionIsCaught() { - ServerTestDriver driver = ServerTestDriver.newInstance(MyRequestHandler.newRequestDenied(), true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); - assertTrue(driver.close()); - } - - @Test - public void requireThatRequestResponseWorks() { - MyRequestHandler requestHandler = MyRequestHandler.newInstance(); - ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - - assertNotNull(requestHandler.awaitRequest()); - assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); - - assertNotNull(driver.awaitSuccess()); - assertTrue(driver.close()); - } - - @Test - public void requireThatRequestIsMbus() { - MyRequestHandler requestHandler = MyRequestHandler.newInstance(); - ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - - Request request = requestHandler.awaitRequest(); - assertTrue(request instanceof MbusRequest); - Message msg = ((MbusRequest)request).getMessage(); - assertTrue(msg instanceof SimpleMessage); - assertEquals("foo", ((SimpleMessage)msg).getValue()); - assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); - - assertNotNull(driver.awaitSuccess()); - assertTrue(driver.close()); - } - - @Test - public void requireThatReplyInsideMbusResponseIsUsed() { - MyRequestHandler requestHandler = MyRequestHandler.newInstance(); - ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - - assertNotNull(requestHandler.awaitRequest()); - Reply reply = new SimpleReply("bar"); - reply.swapState(((MbusRequest)requestHandler.request).getMessage()); - assertTrue(requestHandler.sendResponse(new MbusResponse(Response.Status.OK, reply))); - - reply = driver.awaitSuccess(); - assertTrue(reply instanceof SimpleReply); - assertEquals("bar", ((SimpleReply)reply).getValue()); - - assertTrue(driver.close()); - } - - @Test - public void requireThatNonMbusResponseCausesEmptyReply() { - MyRequestHandler requestHandler = MyRequestHandler.newInstance(); - ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - - assertNotNull(requestHandler.awaitRequest()); - assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); - - assertNotNull(driver.awaitSuccess()); - assertTrue(driver.close()); - } - - @Test - public void requireThatMbusRequestContentCallsCompletion() throws InterruptedException { - MyRequestHandler requestHandler = MyRequestHandler.newInstance(); - ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - - assertNotNull(requestHandler.awaitRequest()); - ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK)); - assertNotNull(content); - MyCompletion completion = new MyCompletion(); - content.close(completion); - assertTrue(completion.completedLatch.await(60, TimeUnit.SECONDS)); - - assertNotNull(driver.awaitSuccess()); - assertTrue(driver.close()); - } - - @Test - public void requireThatResponseContentDoesNotSupportWrite() { - MyRequestHandler requestHandler = MyRequestHandler.newInstance(); - ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - - assertNotNull(requestHandler.awaitRequest()); - ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK)); - assertNotNull(content); - try { - content.write(ByteBuffer.allocate(69), null); - fail(); - } catch (UnsupportedOperationException e) { - - } - content.close(null); - - assertNotNull(driver.awaitSuccess()); - assertTrue(driver.close()); - } - - @Test - public void requireThatResponseErrorCodeDoesNotDuplicateReplyError() { - assertError(Collections.<Integer>emptyList(), - Response.Status.OK); - assertError(Arrays.asList(ErrorCode.APP_FATAL_ERROR), - Response.Status.BAD_REQUEST); - assertError(Arrays.asList(ErrorCode.FATAL_ERROR), - Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR); - assertError(Arrays.asList(ErrorCode.TRANSIENT_ERROR, ErrorCode.APP_FATAL_ERROR), - Response.Status.BAD_REQUEST, ErrorCode.TRANSIENT_ERROR); - assertError(Arrays.asList(ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR), - Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR); - } - - private static void assertError(List<Integer> expectedErrors, int responseStatus, int... responseErrors) { - MyRequestHandler requestHandler = MyRequestHandler.newInstance(); - ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); - assertTrue(driver.sendMessage(new SimpleMessage("foo"))); - - assertNotNull(requestHandler.awaitRequest()); - Reply reply = new SimpleReply("bar"); - reply.swapState(((MbusRequest)requestHandler.request).getMessage()); - for (int err : responseErrors) { - reply.addError(new Error(err, "err")); - } - assertTrue(requestHandler.sendResponse(new MbusResponse(responseStatus, reply))); - - assertNotNull(reply = driver.awaitReply()); - List<Integer> actual = new LinkedList<>(); - for (int i = 0; i < reply.getNumErrors(); ++i) { - actual.add(reply.getError(i).getCode()); - } - assertEquals(expectedErrors, actual); - assertTrue(driver.close()); - } - - private static class MySelector extends AbstractModule implements BindingSetSelector { - - final String bindingSet; - - MySelector(String bindingSet) { - this.bindingSet = bindingSet; - } - - @Override - protected void configure() { - bind(BindingSetSelector.class).toInstance(this); - } - - @Override - public String select(URI uri) { - return bindingSet; - } - } - - private static class MyRequestHandler extends AbstractRequestHandler { - - final MyRequestContent content; - Request request; - ResponseHandler responseHandler; - - MyRequestHandler(MyRequestContent content) { - this.content = content; - } - - @Override - public ContentChannel handleRequest(Request request, ResponseHandler responseHandler) { - this.request = request; - this.responseHandler = responseHandler; - if (content == null) { - throw new RequestDeniedException(request); - } - return content; - } - - Request awaitRequest() { - try { - if (!content.closeLatch.await(60, TimeUnit.SECONDS)) { - return null; - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - if (request instanceof MbusRequest) { - ((MbusRequest)request).getMessage().getTrace().trace(0, "Request received by DISC."); - } - return request; - } - - boolean sendResponse(Response response) { - ContentChannel content = responseHandler.handleResponse(response); - if (content == null) { - return false; - } - content.close(null); - return true; - } - - static MyRequestHandler newInstance() { - return new MyRequestHandler(new MyRequestContent()); - } - - static MyRequestHandler newRequestDenied() { - return new MyRequestHandler(null); - } - } - - private static class MyRequestContent implements ContentChannel { - - final CountDownLatch writeLatch = new CountDownLatch(1); - final CountDownLatch closeLatch = new CountDownLatch(1); - - @Override - public void write(ByteBuffer buf, CompletionHandler handler) { - if (handler != null) { - handler.completed(); - } - writeLatch.countDown(); - } - - @Override - public void close(CompletionHandler handler) { - if (handler != null) { - handler.completed(); - } - closeLatch.countDown(); - } - } - - private static class MyCompletion implements CompletionHandler { - - final CountDownLatch completedLatch = new CountDownLatch(1); - - @Override - public void completed() { - completedLatch.countDown(); - } - - @Override - public void failed(Throwable t) { - - } - } - - private static class MySession implements ServerSession { - - int refCount = 1; - - @Override - public void sendReply(Reply reply) { - - } - - @Override - public MessageHandler getMessageHandler() { - return null; - } - - @Override - public void setMessageHandler(MessageHandler msgHandler) { - - } - - @Override - public String connectionSpec() { - return null; - } - - @Override - public String name() { - return null; - } - - @Override - public ResourceReference refer() { - ++refCount; - return new ResourceReference() { - @Override - public void close() { - --refCount; - } - }; - } - - @Override - public void release() { - --refCount; - } - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java deleted file mode 100644 index a7ee355094f..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc; - -import com.yahoo.jdisc.application.ContainerBuilder; -import com.yahoo.jdisc.service.CurrentContainer; -import com.yahoo.jdisc.test.TestDriver; -import com.yahoo.messagebus.DestinationSessionParams; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.SourceSession; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.network.local.LocalNetwork; -import com.yahoo.messagebus.network.local.LocalWire; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.shared.SharedDestinationSession; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleProtocol; -import org.junit.Test; - -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * @author Simon Thoresen Hult - */ -public class ServerThreadingTestCase { - - private static final int NUM_THREADS = 32; - private static final int NUM_REQUESTS = 1000; - - @Test - public void requireThatServerIsThreadSafe() throws Exception { - final LocalWire wire = new LocalWire(); - final Client client = new Client(wire); - final Server server = new Server(wire); - - for (int i = 0; i < NUM_REQUESTS; ++i) { - final Message msg = new SimpleMessage("foo"); - msg.setRoute(Route.parse(server.delegate.connectionSpec())); - msg.pushHandler(client); - assertThat(client.session.send(msg).isAccepted(), is(true)); - } - for (int i = 0; i < NUM_REQUESTS; ++i) { - final Reply reply = client.replies.poll(600, TimeUnit.SECONDS); - assertThat(reply, instanceOf(EmptyReply.class)); - assertThat(reply.hasErrors(), is(false)); - } - - assertThat(client.close(), is(true)); - assertThat(server.close(), is(true)); - } - - private static class Client implements ReplyHandler { - - final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); - final MessageBus mbus; - final SourceSession session; - - Client(final LocalWire wire) { - mbus = new MessageBus( - new LocalNetwork(wire), - new MessageBusParams().addProtocol(new SimpleProtocol())); - session = mbus.createSourceSession( - new SourceSessionParams() - .setReplyHandler(this) - .setThrottlePolicy(null)); - } - - @Override - public void handleReply(final Reply reply) { - replies.addLast(reply); - } - - boolean close() { - return session.destroy() && mbus.destroy(); - } - } - - private static class Server extends MbusRequestHandler { - - final Executor executor = Executors.newFixedThreadPool(NUM_THREADS); - final MbusServer delegate; - final TestDriver driver; - - Server(final LocalWire wire) { - driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); - delegate = newMbusServer(driver, wire); - - final ContainerBuilder builder = driver.newContainerBuilder(); - builder.serverBindings().bind("mbus://*/*", this); - driver.activateContainer(builder); - delegate.start(); - } - - @Override - public void handleMessage(final Message msg) { - executor.execute(new Runnable() { - - @Override - public void run() { - final Reply reply = new EmptyReply(); - reply.swapState(msg); - reply.popHandler().handleReply(reply); - } - }); - } - - boolean close() { - delegate.release(); - return driver.close(); - } - } - - private static MbusServer newMbusServer(final CurrentContainer container, final LocalWire wire) { - final SharedMessageBus mbus = new SharedMessageBus(new MessageBus( - new LocalNetwork(wire), - new MessageBusParams().addProtocol(new SimpleProtocol()))); - final SharedDestinationSession session = mbus.newDestinationSession( - new DestinationSessionParams()); - final MbusServer server = new MbusServer(container, session); - session.release(); - mbus.release(); - return server; - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java deleted file mode 100644 index ef290a070cb..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc.test; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.test.SimpleProtocol; -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * @author Simon Thoresen Hult - */ -public class ClientTestDriverTestCase { - - @Test - public void requireThatFactoryMethodsWork() throws ListenFailedException { - ClientTestDriver driver = ClientTestDriver.newInstance(); - assertNotNull(driver); - assertTrue(driver.close()); - - driver = ClientTestDriver.newInstanceWithProtocol(new SimpleProtocol()); - assertNotNull(driver); - assertTrue(driver.close()); - - Slobrok slobrok = new Slobrok(); - driver = ClientTestDriver.newInstanceWithExternSlobrok(slobrok.configId()); - assertNotNull(driver); - assertTrue(driver.close()); - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java deleted file mode 100644 index f6ae2335d12..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.jdisc.test; - -import com.yahoo.jdisc.test.NonWorkingRequestHandler; -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.test.SimpleProtocol; -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * @author Simon Thoresen Hult - */ -public class ServerTestDriverTestCase { - - @Test - public void requireThatFactoryMethodsWork() throws ListenFailedException { - ServerTestDriver driver = ServerTestDriver.newInstance(new NonWorkingRequestHandler(), false); - assertNotNull(driver); - assertTrue(driver.close()); - - driver = ServerTestDriver.newInstanceWithProtocol(new SimpleProtocol(), new NonWorkingRequestHandler(), false); - assertNotNull(driver); - assertTrue(driver.close()); - - Slobrok slobrok = new Slobrok(); - driver = ServerTestDriver.newInstanceWithExternSlobrok(slobrok.configId(), new NonWorkingRequestHandler(), false); - assertNotNull(driver); - assertTrue(driver.close()); - } - -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java deleted file mode 100644 index 78e79da4b9f..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.jdisc.test.MessageQueue; -import com.yahoo.messagebus.jdisc.test.RemoteClient; -import com.yahoo.messagebus.jdisc.test.ReplyQueue; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleProtocol; -import com.yahoo.messagebus.test.SimpleReply; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; - -/** - * @author Simon Thoresen Hult - */ -public class SharedDestinationSessionTestCase { - - @Test - public void requireThatMessageHandlerCanBeAccessed() { - SharedDestinationSession session = newDestinationSession(); - assertNull(session.getMessageHandler()); - - MessageQueue handler = new MessageQueue(); - session.setMessageHandler(handler); - assertSame(handler, session.getMessageHandler()); - } - - @Test - public void requireThatMessageHandlerCanOnlyBeSetOnce() { - SharedDestinationSession session = newDestinationSession(); - session.setMessageHandler(new MessageQueue()); - try { - session.setMessageHandler(new MessageQueue()); - fail(); - } catch (IllegalStateException e) { - assertEquals("Message handler already registered.", e.getMessage()); - } - session.release(); - } - - @Test - public void requireThatMessageHandlerIsCalled() throws InterruptedException { - SharedDestinationSession session = newDestinationSession(); - MessageQueue queue = new MessageQueue(); - session.setMessageHandler(queue); - session.handleMessage(new SimpleMessage("foo")); - assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS)); - session.release(); - } - - @Test - public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException { - SharedDestinationSession session = newDestinationSession(); - Message msg = new SimpleMessage("foo"); - ReplyQueue queue = new ReplyQueue(); - msg.pushHandler(queue); - session.handleMessage(msg); - Reply reply = queue.awaitReply(60, TimeUnit.SECONDS); - assertNotNull(reply); - assertEquals(1, reply.getNumErrors()); - assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode()); - session.release(); - } - - @Test - public void requireThatSessionIsClosedOnDestroy() { - SharedDestinationSession session = newDestinationSession(); - session.release(); - assertFalse("DestinationSession not destroyed by release().", session.session().destroy()); - } - - @Test - public void requireThatMbusIsReleasedOnDestroy() { - Slobrok slobrok = null; - try { - slobrok = new Slobrok(); - } catch (ListenFailedException e) { - fail(); - } - RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId()); - SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams); - SharedDestinationSession session = mbus.newDestinationSession(new DestinationSessionParams()); - mbus.release(); - session.release(); - assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy()); - } - - @Test - public void requireThatSessionCanSendReply() throws InterruptedException { - RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true); - MessageQueue queue = new MessageQueue(); - DestinationSessionParams params = new DestinationSessionParams().setMessageHandler(queue); - SharedDestinationSession session = newDestinationSession(client.slobrokId(), params); - Route route = Route.parse(session.connectionSpec()); - - assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted()); - Message msg = queue.awaitMessage(60, TimeUnit.SECONDS); - assertNotNull(msg); - Reply reply = new SimpleReply("bar"); - reply.swapState(msg); - session.sendReply(reply); - assertNotNull(client.awaitReply(60, TimeUnit.SECONDS)); - - session.release(); - client.close(); - } - - private static SharedDestinationSession newDestinationSession() { - Slobrok slobrok = null; - try { - slobrok = new Slobrok(); - } catch (ListenFailedException e) { - fail(); - } - return newDestinationSession(slobrok.configId(), new DestinationSessionParams()); - } - - private static SharedDestinationSession newDestinationSession(String slobrokId, DestinationSessionParams params) { - RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId); - MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol()); - SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); - SharedDestinationSession session = mbus.newDestinationSession(params); - mbus.release(); - return session; - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java deleted file mode 100644 index 87958415149..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.*; -import com.yahoo.messagebus.jdisc.test.MessageQueue; -import com.yahoo.messagebus.jdisc.test.RemoteClient; -import com.yahoo.messagebus.jdisc.test.RemoteServer; -import com.yahoo.messagebus.jdisc.test.ReplyQueue; -import com.yahoo.messagebus.network.local.LocalNetwork; -import com.yahoo.messagebus.network.local.LocalWire; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleProtocol; -import com.yahoo.messagebus.test.SimpleReply; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; - -/** - * @author Simon Thoresen Hult - */ -public class SharedIntermediateSessionTestCase { - - @Test - public void requireThatMessageHandlerCanBeAccessed() { - SharedIntermediateSession session = newIntermediateSession(false); - assertNull(session.getMessageHandler()); - - MessageQueue handler = new MessageQueue(); - session.setMessageHandler(handler); - assertSame(handler, session.getMessageHandler()); - } - - @Test - public void requireThatMessageHandlerCanOnlyBeSetOnce() { - SharedIntermediateSession session = newIntermediateSession(false); - session.setMessageHandler(new MessageQueue()); - try { - session.setMessageHandler(new MessageQueue()); - fail(); - } catch (IllegalStateException e) { - assertEquals("Message handler already registered.", e.getMessage()); - } - session.release(); - } - - @Test - public void requireThatMessageHandlerIsCalled() throws InterruptedException { - SharedIntermediateSession session = newIntermediateSession(false); - MessageQueue queue = new MessageQueue(); - session.setMessageHandler(queue); - session.handleMessage(new SimpleMessage("foo")); - assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS)); - session.release(); - } - - @Test - public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException { - SharedIntermediateSession session = newIntermediateSession(false); - Message msg = new SimpleMessage("foo"); - ReplyQueue queue = new ReplyQueue(); - msg.pushHandler(queue); - session.handleMessage(msg); - Reply reply = queue.awaitReply(60, TimeUnit.SECONDS); - assertNotNull(reply); - assertEquals(1, reply.getNumErrors()); - assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode()); - session.release(); - } - - @Test - public void requireThatReplyHandlerCanNotBeSet() throws ListenFailedException { - Slobrok slobrok = new Slobrok(); - try { - newIntermediateSession(slobrok.configId(), - new IntermediateSessionParams().setReplyHandler(new ReplyQueue()), - false); - fail(); - } catch (IllegalArgumentException e) { - assertEquals("Reply handler must be null.", e.getMessage()); - } - } - - @Test - public void requireThatSessionIsClosedOnDestroy() { - SharedIntermediateSession session = newIntermediateSession(false); - session.release(); - assertFalse("IntermediateSession not destroyed by release().", session.session().destroy()); - } - - @Test - public void requireThatMbusIsReleasedOnDestroy() { - try { - new Slobrok(); - } catch (ListenFailedException e) { - fail(); - } - SharedMessageBus mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), new MessageBusParams())); - - SharedIntermediateSession session = mbus.newIntermediateSession(new IntermediateSessionParams()); - mbus.release(); - session.release(); - assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy()); - } - - @Test - public void requireThatSessionCanSendMessage() throws InterruptedException { - RemoteServer server = RemoteServer.newInstanceWithInternSlobrok(); - SharedIntermediateSession session = newIntermediateSession(server.slobrokId(), - new IntermediateSessionParams(), - true); - ReplyQueue queue = new ReplyQueue(); - Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec())); - msg.setTimeReceivedNow(); - msg.setTimeRemaining(60000); - msg.getTrace().setLevel(9); - msg.pushHandler(queue); - assertTrue(session.sendMessage(msg).isAccepted()); - assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS)); - server.ackMessage(msg); - assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS)); - - session.release(); - server.close(); - } - - @Test - public void requireThatSessionCanSendReply() throws InterruptedException { - RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true); - MessageQueue queue = new MessageQueue(); - IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue); - SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true); - Route route = Route.parse(session.connectionSpec()); - - assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted()); - Message msg = queue.awaitMessage(60, TimeUnit.SECONDS); - assertNotNull(msg); - Reply reply = new SimpleReply("bar"); - reply.swapState(msg); - session.sendReply(reply); - assertNotNull(client.awaitReply(60, TimeUnit.SECONDS)); - - session.release(); - client.close(); - } - - private static SharedIntermediateSession newIntermediateSession(boolean network) { - Slobrok slobrok = null; - try { - slobrok = new Slobrok(); - } catch (ListenFailedException e) { - fail(); - } - return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network); - } - - private static SharedIntermediateSession newIntermediateSession(String slobrokId, - IntermediateSessionParams params, - boolean network) { - RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId); - MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol()); - SharedMessageBus mbus = network - ? SharedMessageBus.newInstance(mbusParams, netParams) - : new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), mbusParams)); - SharedIntermediateSession session = mbus.newIntermediateSession(params); - mbus.release(); - return session; - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java deleted file mode 100644 index a54489a89e6..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import org.junit.Test; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; - -/** - * @author Simon Thoresen Hult - */ -public class SharedMessageBusTestCase { - - @Test - public void requireThatMbusCanNotBeNull() { - try { - new SharedMessageBus(null); - fail(); - } catch (NullPointerException e) { - // expected - } - } - - @Test - public void requireThatMbusIsClosedOnDestroy() throws ListenFailedException { - Slobrok slobrok = new Slobrok(); - SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), - new RPCNetworkParams() - .setSlobrokConfigId(slobrok.configId())); - mbus.release(); - assertFalse(mbus.messageBus().destroy()); - } -} diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java deleted file mode 100644 index 1f0966fc961..00000000000 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.shared; - -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.slobrok.server.Slobrok; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.jdisc.test.RemoteServer; -import com.yahoo.messagebus.jdisc.test.ReplyQueue; -import com.yahoo.messagebus.network.rpc.RPCNetworkParams; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.test.SimpleMessage; -import com.yahoo.messagebus.test.SimpleProtocol; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; - -/** - * @author Simon Thoresen Hult - */ -public class SharedSourceSessionTestCase { - - @Test - public void requireThatReplyHandlerCanNotBeSet() { - try { - newSourceSession(new SourceSessionParams().setReplyHandler(new ReplyQueue())); - fail(); - } catch (IllegalArgumentException e) { - assertEquals("Reply handler must be null.", e.getMessage()); - } - } - - @Test - public void requireThatSessionIsClosedOnDestroy() { - SharedSourceSession session = newSourceSession(new SourceSessionParams()); - session.release(); - assertFalse("SourceSession not destroyed by release().", session.session().destroy()); - } - - @Test - public void requireThatMbusIsReleasedOnDestroy() { - Slobrok slobrok = null; - try { - slobrok = new Slobrok(); - } catch (ListenFailedException e) { - fail(); - } - RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId()); - SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams); - SharedSourceSession session = mbus.newSourceSession(new SourceSessionParams()); - mbus.release(); - session.release(); - assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy()); - } - - @Test - public void requireThatSessionCanSendMessage() throws InterruptedException { - RemoteServer server = RemoteServer.newInstanceWithInternSlobrok(); - SharedSourceSession session = newSourceSession(server.slobrokId(), - new SourceSessionParams()); - ReplyQueue queue = new ReplyQueue(); - Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec())); - msg.pushHandler(queue); - assertTrue(session.sendMessage(msg).isAccepted()); - assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS)); - server.ackMessage(msg); - assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS)); - - session.release(); - server.close(); - } - - private static SharedSourceSession newSourceSession(SourceSessionParams params) { - Slobrok slobrok = null; - try { - slobrok = new Slobrok(); - } catch (ListenFailedException e) { - fail(); - } - return newSourceSession(slobrok.configId(), params); - } - - private static SharedSourceSession newSourceSession(String slobrokId, SourceSessionParams params) { - RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId); - MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol()); - SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); - SharedSourceSession session = mbus.newSourceSession(params); - mbus.release(); - return session; - } -} |