aboutsummaryrefslogtreecommitdiffstats
path: root/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java')
-rw-r--r--container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java134
1 files changed, 134 insertions, 0 deletions
diff --git a/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
new file mode 100644
index 00000000000..111805d61b0
--- /dev/null
+++ b/container-messagebus/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
@@ -0,0 +1,134 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.jdisc.test;
+
+import com.yahoo.jdisc.References;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.ResourceReference;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.test.TestDriver;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.jdisc.MbusClient;
+import com.yahoo.messagebus.jdisc.MbusRequest;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.SharedMessageBus;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class ClientTestDriver {
+
+ private final RemoteServer server;
+ private final MbusClient client;
+ private final SharedSourceSession session;
+ private final TestDriver driver;
+
+ private ClientTestDriver(RemoteServer server, Protocol protocol) {
+ this.server = server;
+
+ MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
+ RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
+ SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
+ session = mbus.newSourceSession(new SourceSessionParams());
+ client = new MbusClient(session);
+ client.start();
+ mbus.release();
+
+ driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
+ ContainerBuilder builder = driver.newContainerBuilder();
+ builder.clientBindings().bind("mbus://*/*", client);
+ driver.activateContainer(builder);
+ }
+
+ public SourceSession sourceSession() {
+ return session.session();
+ }
+
+ public Request newServerRequest() {
+ return new Request(driver, URI.create("mbus://localhost/"));
+ }
+
+ public Request newClientRequest(Message msg) {
+ msg.setRoute(Route.parse(server.connectionSpec()));
+ if (msg.getTrace().getLevel() == 0) {
+ msg.getTrace().setLevel(9);
+ }
+ final Request parent = newServerRequest();
+ try (final ResourceReference ref = References.fromResource(parent)) {
+ return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg);
+ }
+ }
+
+ public boolean sendRequest(Request request, ResponseHandler responseHandler) {
+ request.connect(responseHandler).close(null);
+ return true;
+ }
+
+ public boolean sendMessage(Message msg, ResponseHandler responseHandler) {
+ final Request request = newClientRequest(msg);
+ try (final ResourceReference ref = References.fromResource(request)) {
+ return sendRequest(request, responseHandler);
+ }
+ }
+
+ public Message awaitMessage() {
+ Message msg = null;
+ try {
+ msg = server.awaitMessage(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ if (msg != null) {
+ msg.getTrace().trace(0, "Message received by RemoteServer.");
+ }
+ return msg;
+ }
+
+ public void sendReply(Reply reply) {
+ reply.getTrace().trace(0, "Sending reply from RemoteServer.");
+ server.sendReply(reply);
+ }
+
+ public boolean awaitMessageAndSendReply(Reply reply) {
+ Message msg = awaitMessage();
+ if (msg == null) {
+ return false;
+ }
+ reply.swapState(msg);
+ sendReply(reply);
+ return true;
+ }
+
+ public boolean close() {
+ session.release();
+ client.release();
+ server.close();
+ return driver.close();
+ }
+
+ public MbusClient client() {
+ return client;
+ }
+
+ public RemoteServer server() {
+ return server;
+ }
+
+ public static ClientTestDriver newInstance() {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol());
+ }
+
+ public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
+ }
+
+ public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
+ return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
+ }
+}