diff options
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test')
6 files changed, 505 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java new file mode 100644 index 00000000000..111805d61b0 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java @@ -0,0 +1,134 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jdisc.References; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.MbusClient; +import com.yahoo.messagebus.jdisc.MbusRequest; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class ClientTestDriver { + + private final RemoteServer server; + private final MbusClient client; + private final SharedSourceSession session; + private final TestDriver driver; + + private ClientTestDriver(RemoteServer server, Protocol protocol) { + this.server = server; + + MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId()); + SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + session = mbus.newSourceSession(new SourceSessionParams()); + client = new MbusClient(session); + client.start(); + mbus.release(); + + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + ContainerBuilder builder = driver.newContainerBuilder(); + builder.clientBindings().bind("mbus://*/*", client); + driver.activateContainer(builder); + } + + public SourceSession sourceSession() { + return session.session(); + } + + public Request newServerRequest() { + return new Request(driver, URI.create("mbus://localhost/")); + } + + public Request newClientRequest(Message msg) { + msg.setRoute(Route.parse(server.connectionSpec())); + if (msg.getTrace().getLevel() == 0) { + msg.getTrace().setLevel(9); + } + final Request parent = newServerRequest(); + try (final ResourceReference ref = References.fromResource(parent)) { + return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg); + } + } + + public boolean sendRequest(Request request, ResponseHandler responseHandler) { + request.connect(responseHandler).close(null); + return true; + } + + public boolean sendMessage(Message msg, ResponseHandler responseHandler) { + final Request request = newClientRequest(msg); + try (final ResourceReference ref = References.fromResource(request)) { + return sendRequest(request, responseHandler); + } + } + + public Message awaitMessage() { + Message msg = null; + try { + msg = server.awaitMessage(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (msg != null) { + msg.getTrace().trace(0, "Message received by RemoteServer."); + } + return msg; + } + + public void sendReply(Reply reply) { + reply.getTrace().trace(0, "Sending reply from RemoteServer."); + server.sendReply(reply); + } + + public boolean awaitMessageAndSendReply(Reply reply) { + Message msg = awaitMessage(); + if (msg == null) { + return false; + } + reply.swapState(msg); + sendReply(reply); + return true; + } + + public boolean close() { + session.release(); + client.release(); + server.close(); + return driver.close(); + } + + public MbusClient client() { + return client; + } + + public RemoteServer server() { + return server; + } + + public static ClientTestDriver newInstance() { + return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol()); + } + + public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) { + return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol); + } + + public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) { + return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol()); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java new file mode 100644 index 00000000000..c5287165e27 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java @@ -0,0 +1,27 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class MessageQueue implements MessageHandler { + + private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); + + @Override + public void handleMessage(Message msg) { + queue.add(msg); + } + + public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java new file mode 100644 index 00000000000..57d0abd980b --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java @@ -0,0 +1,76 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class RemoteClient { + + private final Slobrok slobrok; + private final String slobrokId; + private final MessageBus mbus; + private final ReplyQueue queue = new ReplyQueue(); + private final SourceSession session; + + private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) { + this.slobrok = slobrok; + this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId; + mbus = network + ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)), + new MessageBusParams().addProtocol(protocol)) + : new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol)); + session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue)); + } + + public Result sendMessage(Message msg) { + return session.send(msg); + } + + public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException { + return queue.awaitReply(timeout, unit); + } + + public String slobrokId() { + return slobrokId; + } + + public void close() { + session.destroy(); + mbus.destroy(); + if (slobrok != null) { + slobrok.stop(); + } + } + + public static RemoteClient newInstanceWithInternSlobrok(boolean network) { + return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network); + } + + public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) { + return new RemoteClient(null, slobrokId, new SimpleProtocol(), network); + } + + public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) { + return new RemoteClient(newSlobrok(), null, protocol, network); + } + + private static Slobrok newSlobrok() { + Slobrok slobrok; + try { + slobrok = new Slobrok(); + } catch (ListenFailedException e) { + throw new IllegalStateException(e); + } + return slobrok; + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java new file mode 100644 index 00000000000..1f0f82c4903 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java @@ -0,0 +1,87 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class RemoteServer { + + private final Slobrok slobrok; + private final String slobrokId; + private final MessageBus mbus; + private final MessageQueue queue = new MessageQueue(); + private final DestinationSession session; + + private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) { + this.slobrok = slobrok; + this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId; + mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams() + .setSlobrokConfigId(this.slobrokId) + .setIdentity(new Identity(identity))), + new MessageBusParams().addProtocol(protocol)); + session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue)); + } + + public String connectionSpec() { + return session.getConnectionSpec(); + } + + public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException { + return queue.awaitMessage(timeout, unit); + } + + public void ackMessage(Message msg) { + session.acknowledge(msg); + } + + public void sendReply(Reply reply) { + session.reply(reply); + } + + public String slobrokId() { + return slobrokId; + } + + public void close() { + session.destroy(); + mbus.destroy(); + if (slobrok != null) { + slobrok.stop(); + } + } + + public static RemoteServer newInstanceWithInternSlobrok() { + return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote"); + } + + public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) { + return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote"); + } + + public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) { + return new RemoteServer(null, slobrokId, protocol, identity); + } + + public static RemoteServer newInstanceWithProtocol(Protocol protocol) { + return new RemoteServer(newSlobrok(), null, protocol, "remote"); + } + + private static Slobrok newSlobrok() { + try { + return new Slobrok(); + } catch (ListenFailedException e) { + throw new IllegalStateException(e); + } + } + +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java new file mode 100644 index 00000000000..6c48aab5a7f --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java @@ -0,0 +1,26 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class ReplyQueue implements ReplyHandler { + + private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>(); + + @Override + public void handleReply(Reply reply) { + queue.add(reply); + } + + public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException { + return queue.poll(timeout, unit); + } +} diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java new file mode 100644 index 00000000000..e59db28e886 --- /dev/null +++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java @@ -0,0 +1,155 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.google.inject.Module; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.RequestHandler; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.jdisc.MbusServer; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.ServerSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author Simon Thoresen Hult + */ +public class ServerTestDriver { + + private final RemoteClient client; + private final MbusServer server; + private final TestDriver driver; + + private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler, + Protocol protocol, Module... guiceModules) + { + this.client = client; + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules); + if (activateContainer) { + ContainerBuilder builder = driver.newContainerBuilder(); + if (requestHandler != null) { + builder.serverBindings().bind("mbus://*/*", requestHandler); + } + driver.activateContainer(builder); + } + + MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol); + RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId()); + SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams); + ServerSession session = mbus.newDestinationSession(new DestinationSessionParams()); + server = new MbusServer(driver, session); + server.start(); + session.release(); + mbus.release(); + } + + public boolean sendMessage(Message msg) { + msg.setRoute(Route.parse(server.connectionSpec())); + msg.getTrace().setLevel(9); + return client.sendMessage(msg).isAccepted(); + } + + public Reply awaitReply() { + Reply reply = null; + try { + reply = client.awaitReply(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (reply != null) { + System.out.println(reply.getTrace()); + } + return reply; + } + + public Reply awaitSuccess() { + Reply reply = awaitReply(); + if (reply == null || reply.hasErrors()) { + return null; + } + return reply; + } + + public Reply awaitErrors(Integer... errCodes) { + Reply reply = awaitReply(); + if (reply == null) { + return null; + } + List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes)); + for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { + Error err = reply.getError(i); + System.out.println(err); + int idx = lst.indexOf(err.getCode()); + if (idx < 0) { + return null; + } + lst.remove(idx); + } + if (!lst.isEmpty()) { + return null; + } + return reply; + } + + public boolean close() { + server.close(); + server.release(); + client.close(); + return driver.close(); + } + + public TestDriver parent() { + return driver; + } + + public RemoteClient client() { + return client; + } + + public MbusServer server() { + return server; + } + + public static ServerTestDriver newInstance(RequestHandler requestHandler, boolean network, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, + new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler, + boolean network, Module... guiceModules) + { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, protocol, + guiceModules); + } + + public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler, + boolean network, Module... guiceModules) + { + return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network), + true, requestHandler, new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null, + new SimpleProtocol(), guiceModules); + } + + public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, boolean network, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol, network), false, null, + protocol, guiceModules); + } + + public static ServerTestDriver newUnboundInstance(boolean network, Module... guiceModules) { + return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, null, + new SimpleProtocol(), guiceModules); + } + +} |