summaryrefslogtreecommitdiffstats
path: root/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test
Publish
Diffstat (limited to 'jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test')
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java134
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java27
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java73
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java87
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java26
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java154
6 files changed, 501 insertions, 0 deletions
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
new file mode 100644
index 00000000000..b478f9fe8db
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
@@ -0,0 +1,134 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jdisc.References;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.MbusClient;
+import com.yahoo.messagebus.jdisc.MbusRequest;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ClientTestDriver {
+
+ private final RemoteServer server;
+ private final MbusClient client;
+ private final SharedSourceSession session;
+ private final TestDriver driver;
+
+ private ClientTestDriver(RemoteServer server, Protocol protocol) {
+ this.server = server;
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ session = mbus.newSourceSession(new SourceSessionParams());
+ client = new MbusClient(session);
+ client.start();
+ mbus.release();
+
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ ContainerBuilder builder = driver.newContainerBuilder();
+ builder.clientBindings().bind("mbus://*/*", client);
+ driver.activateContainer(builder);
+ }
+
+ public SourceSession sourceSession() {
+ return session.session();
+ }
+
+ public Request newServerRequest() {
+ return new Request(driver, URI.create("mbus://localhost/"));
+ }
+
+ public Request newClientRequest(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ if (msg.getTrace().getLevel() == 0) {
+ msg.getTrace().setLevel(9);
+ }
+ final Request parent = newServerRequest();
+ try (final ResourceReference ref = References.fromResource(parent)) {
+ return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg);
+ }
+ }
+
+ public boolean sendRequest(Request request, ResponseHandler responseHandler) {
+ request.connect(responseHandler).close(null);
+ return true;
+ }
+
+ public boolean sendMessage(Message msg, ResponseHandler responseHandler) {
+ final Request request = newClientRequest(msg);
+ try (final ResourceReference ref = References.fromResource(request)) {
+ return sendRequest(request, responseHandler);
+ }
+ }
+
+ public Message awaitMessage() {
+ Message msg = null;
+ try {
+ msg = server.awaitMessage(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (msg != null) {
+ msg.getTrace().trace(0, "Message received by RemoteServer.");
+ }
+ return msg;
+ }
+
+ public void sendReply(Reply reply) {
+ reply.getTrace().trace(0, "Sending reply from RemoteServer.");
+ server.sendReply(reply);
+ }
+
+ public boolean awaitMessageAndSendReply(Reply reply) {
+ Message msg = awaitMessage();
+ if (msg == null) {
+ return false;
+ }
+ reply.swapState(msg);
+ sendReply(reply);
+ return true;
+ }
+
+ public boolean close() {
+ session.release();
+ client.release();
+ server.close();
+ return driver.close();
+ }
+
+ public MbusClient client() {
+ return client;
+ }
+
+ public RemoteServer server() {
+ return server;
+ }
+
+ public static ClientTestDriver newInstance() {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol());
+ }
+
+ public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
+ }
+
+ public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
new file mode 100644
index 00000000000..be280d416f4
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
@@ -0,0 +1,27 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class MessageQueue implements MessageHandler {
+
+ private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
+
+ @Override
+ public void handleMessage(Message msg) {
+ queue.add(msg);
+ }
+
+ public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
new file mode 100644
index 00000000000..e918ab46d95
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
@@ -0,0 +1,73 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RemoteClient {
+
+ private final Slobrok slobrok;
+ private final String slobrokId;
+ private final MessageBus mbus;
+ private final ReplyQueue queue = new ReplyQueue();
+ private final SourceSession session;
+
+ private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)),
+ new MessageBusParams().addProtocol(protocol));
+ session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue));
+ }
+
+ public Result sendMessage(Message msg) {
+ return session.send(msg);
+ }
+
+ public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.awaitReply(timeout, unit);
+ }
+
+ public String slobrokId() {
+ return slobrokId;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
+ }
+
+ public static RemoteClient newInstanceWithInternSlobrok() {
+ return new RemoteClient(newSlobrok(), null, new SimpleProtocol());
+ }
+
+ public static RemoteClient newInstanceWithExternSlobrok(String slobrokId) {
+ return new RemoteClient(null, slobrokId, new SimpleProtocol());
+ }
+
+ public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol) {
+ return new RemoteClient(newSlobrok(), null, protocol);
+ }
+
+ private static Slobrok newSlobrok() {
+ Slobrok slobrok;
+ try {
+ slobrok = new Slobrok();
+ } catch (ListenFailedException e) {
+ throw new IllegalStateException(e);
+ }
+ return slobrok;
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
new file mode 100644
index 00000000000..8bd058eab9b
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
@@ -0,0 +1,87 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RemoteServer {
+
+ private final Slobrok slobrok;
+ private final String slobrokId;
+ private final MessageBus mbus;
+ private final MessageQueue queue = new MessageQueue();
+ private final DestinationSession session;
+
+ private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) {
+ this.slobrok = slobrok;
+ this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
+ mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
+ .setSlobrokConfigId(this.slobrokId)
+ .setIdentity(new Identity(identity))),
+ new MessageBusParams().addProtocol(protocol));
+ session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue));
+ }
+
+ public String connectionSpec() {
+ return session.getConnectionSpec();
+ }
+
+ public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.awaitMessage(timeout, unit);
+ }
+
+ public void ackMessage(Message msg) {
+ session.acknowledge(msg);
+ }
+
+ public void sendReply(Reply reply) {
+ session.reply(reply);
+ }
+
+ public String slobrokId() {
+ return slobrokId;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ if (slobrok != null) {
+ slobrok.stop();
+ }
+ }
+
+ public static RemoteServer newInstanceWithInternSlobrok() {
+ return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) {
+ return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote");
+ }
+
+ public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) {
+ return new RemoteServer(null, slobrokId, protocol, identity);
+ }
+
+ public static RemoteServer newInstanceWithProtocol(Protocol protocol) {
+ return new RemoteServer(newSlobrok(), null, protocol, "remote");
+ }
+
+ private static Slobrok newSlobrok() {
+ try {
+ return new Slobrok();
+ } catch (ListenFailedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
new file mode 100644
index 00000000000..9418733a221
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
@@ -0,0 +1,26 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ReplyQueue implements ReplyHandler {
+
+ private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>();
+
+ @Override
+ public void handleReply(Reply reply) {
+ queue.add(reply);
+ }
+
+ public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
+ return queue.poll(timeout, unit);
+ }
+}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
new file mode 100644
index 00000000000..1ef492c18aa
--- /dev/null
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
@@ -0,0 +1,154 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.google.inject.Module;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.jdisc.MbusServer;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.ServerSession;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ServerTestDriver {
+
+ private final RemoteClient client;
+ private final MbusServer server;
+ private final TestDriver driver;
+
+ private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler,
+ Protocol protocol, Module... guiceModules)
+ {
+ this.client = client;
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules);
+ if (activateContainer) {
+ ContainerBuilder builder = driver.newContainerBuilder();
+ if (requestHandler != null) {
+ builder.serverBindings().bind("mbus://*/*", requestHandler);
+ }
+ driver.activateContainer(builder);
+ }
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ ServerSession session = mbus.newDestinationSession(new DestinationSessionParams());
+ server = new MbusServer(driver, session);
+ server.start();
+ session.release();
+ mbus.release();
+ }
+
+ public boolean sendMessage(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ msg.getTrace().setLevel(9);
+ return client.sendMessage(msg).isAccepted();
+ }
+
+ public Reply awaitReply() {
+ Reply reply = null;
+ try {
+ reply = client.awaitReply(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (reply != null) {
+ System.out.println(reply.getTrace());
+ }
+ return reply;
+ }
+
+ public Reply awaitSuccess() {
+ Reply reply = awaitReply();
+ if (reply == null || reply.hasErrors()) {
+ return null;
+ }
+ return reply;
+ }
+
+ public Reply awaitErrors(Integer... errCodes) {
+ Reply reply = awaitReply();
+ if (reply == null) {
+ return null;
+ }
+ List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes));
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ Error err = reply.getError(i);
+ System.out.println(err);
+ int idx = lst.indexOf(err.getCode());
+ if (idx < 0) {
+ return null;
+ }
+ lst.remove(idx);
+ }
+ if (!lst.isEmpty()) {
+ return null;
+ }
+ return reply;
+ }
+
+ public boolean close() {
+ server.close();
+ server.release();
+ client.close();
+ return driver.close();
+ }
+
+ public TestDriver parent() {
+ return driver;
+ }
+
+ public RemoteClient client() {
+ return client;
+ }
+
+ public MbusServer server() {
+ return server;
+ }
+
+ public static ServerTestDriver newInstance(RequestHandler requestHandler, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, requestHandler,
+ new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler,
+ Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, requestHandler, protocol,
+ guiceModules);
+ }
+
+ public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler,
+ Module... guiceModules)
+ {
+ return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId),
+ true, requestHandler, new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInactiveInstance(Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), false, null,
+ new SimpleProtocol(), guiceModules);
+ }
+
+ public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol), false, null,
+ protocol, guiceModules);
+ }
+
+ public static ServerTestDriver newUnboundInstance(Module... guiceModules) {
+ return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(), true, null,
+ new SimpleProtocol(), guiceModules);
+ }
+}