aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test
diff options
context:
space:
mode:
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java134
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java27
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java76
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java87
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java26
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java155
6 files changed, 505 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
new file mode 100644
index 00000000000..111805d61b0
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
@@ -0,0 +1,134 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jdisc.References;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.MbusClient;
+import com.yahoo.messagebus.jdisc.MbusRequest;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ClientTestDriver {
+
+ private final RemoteServer server;
+ private final MbusClient client;
+ private final SharedSourceSession session;
+ private final TestDriver driver;
+
+ private ClientTestDriver(RemoteServer server, Protocol protocol) {
+ this.server = server;
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ session = mbus.newSourceSession(new SourceSessionParams());
+ client = new MbusClient(session);
+ client.start();
+ mbus.release();
+
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ ContainerBuilder builder = driver.newContainerBuilder();
+ builder.clientBindings().bind("mbus://*/*", client);
+ driver.activateContainer(builder);
+ }
+
+ public SourceSession sourceSession() {
+ return session.session();
+ }
+
+ public Request newServerRequest() {
+ return new Request(driver, URI.create("mbus://localhost/"));
+ }
+
+ public Request newClientRequest(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ if (msg.getTrace().getLevel() == 0) {
+ msg.getTrace().setLevel(9);
+ }
+ final Request parent = newServerRequest();
+ try (final ResourceReference ref = References.fromResource(parent)) {
+ return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg);
+ }
+ }
+
+ public boolean sendRequest(Request request, ResponseHandler responseHandler) {
+ request.connect(responseHandler).close(null);
+ return true;
+ }
+
+ public boolean sendMessage(Message msg, ResponseHandler responseHandler) {
+ final Request request = newClientRequest(msg);
+ try (final ResourceReference ref = References.fromResource(request)) {
+ return sendRequest(request, responseHandler);
+ }
+ }
+
+ public Message awaitMessage() {
+ Message msg = null;
+ try {
+ msg = server.awaitMessage(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (msg != null) {
+ msg.getTrace().trace(0, "Message received by RemoteServer.");
+ }
+ return msg;
+ }
+
+ public void sendReply(Reply reply) {
+ reply.getTrace().trace(0, "Sending reply from RemoteServer.");
+ server.sendReply(reply);
+ }
+
+ public boolean awaitMessageAndSendReply(Reply reply) {
+ Message msg = awaitMessage();
+ if (msg == null) {
+ return false;
+ }
+ reply.swapState(msg);
+ sendReply(reply);
+ return true;
+ }
+
+ public boolean close() {
+ session.release();
+ client.release();
+ server.close();
+ return driver.close();
+ }
+
+ public MbusClient client() {
+ return client;
+ }
+
+ public RemoteServer server() {
+ return server;
+ }
+
+ public static ClientTestDriver newInstance() {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol());
+ }
+
+ public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
+ }
+
+ public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
new file mode 100644
index 00000000000..c5287165e27
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
@@ -0,0 +1,27 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class MessageQueue implements MessageHandler {
+
+ private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+
+ @Override
+ public void handleMessage(Message msg) {
+ queue.add(msg);
+ }
+
+ public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
new file mode 100644
index 00000000000..57d0abd980b
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
@@ -0,0 +1,76 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.local.LocalNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class RemoteClient {
+
+ private final Slobrok slobrok;
+ private final String slobrokId;
+ private final MessageBus mbus;
+ private final ReplyQueue queue = new ReplyQueue();
+ private final SourceSession session;
+
+ private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ mbus = network
+ ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)),
+ new MessageBusParams().addProtocol(protocol))
+ : new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol));
+ session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue));
+ }
+
+ public Result sendMessage(Message msg) {
+ return session.send(msg);
+ }
+
+ public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.awaitReply(timeout, unit);
+ }
+
+ public String slobrokId() {
+ return slobrokId;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
+ }
+
+ public static RemoteClient newInstanceWithInternSlobrok(boolean network) {
+ return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network);
+ }
+
+ public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) {
+ return new RemoteClient(null, slobrokId, new SimpleProtocol(), network);
+ }
+
+ public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) {
+ return new RemoteClient(newSlobrok(), null, protocol, network);
+ }
+
+ private static Slobrok newSlobrok() {
+ Slobrok slobrok;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ throw new IllegalStateException(e);
+ }
+ return slobrok;
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
new file mode 100644
index 00000000000..1f0f82c4903
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
@@ -0,0 +1,87 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class RemoteServer {
+
+ private final Slobrok slobrok;
+ private final String slobrokId;
+ private final MessageBus mbus;
+ private final MessageQueue queue = new MessageQueue();
+ private final DestinationSession session;
+
+ private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
+ .setSlobrokConfigId(this.slobrokId)
+ .setIdentity(new Identity(identity))),
+ new MessageBusParams().addProtocol(protocol));
+ session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue));
+ }
+
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.awaitMessage(timeout, unit);
+ }
+
+ public void ackMessage(Message msg) {
+ session.acknowledge(msg);
+ }
+
+ public void sendReply(Reply reply) {
+ session.reply(reply);
+ }
+
+ public String slobrokId() {
+ return slobrokId;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
+ }
+
+ public static RemoteServer newInstanceWithInternSlobrok() {
+ return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) {
+ return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) {
+ return new RemoteServer(null, slobrokId, protocol, identity);
+ }
+
+ public static RemoteServer newInstanceWithProtocol(Protocol protocol) {
+ return new RemoteServer(newSlobrok(), null, protocol, "remote");
+ }
+
+ private static Slobrok newSlobrok() {
+ try {
+ return new Slobrok();
+ } catch (ListenFailedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
new file mode 100644
index 00000000000..6c48aab5a7f
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
@@ -0,0 +1,26 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ReplyQueue implements ReplyHandler {
+
+ private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>();
+
+ @Override
+ public void handleReply(Reply reply) {
+ queue.add(reply);
+ }
+
+ public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+}
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
new file mode 100644
index 00000000000..e59db28e886
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
@@ -0,0 +1,155 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.google.inject.Module;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.jdisc.MbusServer;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.ServerSession;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ServerTestDriver {
+
+ private final RemoteClient client;
+ private final MbusServer server;
+ private final TestDriver driver;
+
+ private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler,
+ Protocol protocol, Module... guiceModules)
+ {
+ this.client = client;
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules);
+ if (activateContainer) {
+ ContainerBuilder builder = driver.newContainerBuilder();
+ if (requestHandler != null) {
+ builder.serverBindings().bind("mbus://*/*", requestHandler);
+ }
+ driver.activateContainer(builder);
+ }
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ ServerSession session = mbus.newDestinationSession(new DestinationSessionParams());
+ server = new MbusServer(driver, session);
+ server.start();
+ session.release();
+ mbus.release();
+ }
+
+ public boolean sendMessage(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ msg.getTrace().setLevel(9);
+ return client.sendMessage(msg).isAccepted();
+ }
+
+ public Reply awaitReply() {
+ Reply reply = null;
+ try {
+ reply = client.awaitReply(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (reply != null) {
+ System.out.println(reply.getTrace());
+ }
+ return reply;
+ }
+
+ public Reply awaitSuccess() {
+ Reply reply = awaitReply();
+ if (reply == null || reply.hasErrors()) {
+ return null;
+ }
+ return reply;
+ }
+
+ public Reply awaitErrors(Integer... errCodes) {
+ Reply reply = awaitReply();
+ if (reply == null) {
+ return null;
+ }
+ List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes));
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ Error err = reply.getError(i);
+ System.out.println(err);
+ int idx = lst.indexOf(err.getCode());
+ if (idx < 0) {
+ return null;
+ }
+ lst.remove(idx);
+ }
+ if (!lst.isEmpty()) {
+ return null;
+ }
+ return reply;
+ }
+
+ public boolean close() {
+ server.close();
+ server.release();
+ client.close();
+ return driver.close();
+ }
+
+ public TestDriver parent() {
+ return driver;
+ }
+
+ public RemoteClient client() {
+ return client;
+ }
+
+ public MbusServer server() {
+ return server;
+ }
+
+ public static ServerTestDriver newInstance(RequestHandler requestHandler, boolean network, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler,
+ new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler,
+ boolean network, Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, protocol,
+ guiceModules);
+ }
+
+ public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler,
+ boolean network, Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network),
+ true, requestHandler, new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null,
+ new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, boolean network, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol, network), false, null,
+ protocol, guiceModules);
+ }
+
+ public static ServerTestDriver newUnboundInstance(boolean network, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, null,
+ new SimpleProtocol(), guiceModules);
+ }
+
+}