summaryrefslogtreecommitdiffstats
path: root/jdisc_messagebus_service
diff options
context:
space:
mode:
authorgjoranv <gv@verizonmedia.com>2021-03-29 21:35:59 +0200
committergjoranv <gv@verizonmedia.com>2021-03-29 21:37:08 +0200
commit8a2d6c1c4110ddf508f3a467eeb2ab031998591a (patch)
treeb0590dc285312b39e13b84b65b72164ee244e9af /jdisc_messagebus_service
parent763b534e158b41224c6cce3e5fd063bcecd48dcb (diff)
Remove jdisc_messagebus_service and messagebus-disc modules.
- They have been merged into container-messagebus
Diffstat (limited to 'jdisc_messagebus_service')
-rw-r--r--jdisc_messagebus_service/.gitignore2
-rw-r--r--jdisc_messagebus_service/OWNERS2
-rw-r--r--jdisc_messagebus_service/README.md4
-rw-r--r--jdisc_messagebus_service/pom.xml54
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java22
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java147
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java38
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java59
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java25
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java135
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java77
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java134
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java27
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java76
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java87
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java26
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java155
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java14
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java73
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java22
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java85
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java104
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java68
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java58
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java8
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java5
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java149
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java345
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java121
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java73
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java46
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java694
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java374
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java137
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java32
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java34
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java134
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java174
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java37
-rw-r--r--jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java94
45 files changed, 0 insertions, 3976 deletions
diff --git a/jdisc_messagebus_service/.gitignore b/jdisc_messagebus_service/.gitignore
deleted file mode 100644
index 12251442258..00000000000
--- a/jdisc_messagebus_service/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-/target
-/pom.xml.build
diff --git a/jdisc_messagebus_service/OWNERS b/jdisc_messagebus_service/OWNERS
deleted file mode 100644
index 78b92e411b4..00000000000
--- a/jdisc_messagebus_service/OWNERS
+++ /dev/null
@@ -1,2 +0,0 @@
-gjoranv
-bjorncs
diff --git a/jdisc_messagebus_service/README.md b/jdisc_messagebus_service/README.md
deleted file mode 100644
index cecb21de952..00000000000
--- a/jdisc_messagebus_service/README.md
+++ /dev/null
@@ -1,4 +0,0 @@
-<!-- Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
-# JDisc messagebus service
-
-Messagebus protocol implementation for JDisc.
diff --git a/jdisc_messagebus_service/pom.xml b/jdisc_messagebus_service/pom.xml
deleted file mode 100644
index 55f8392a5df..00000000000
--- a/jdisc_messagebus_service/pom.xml
+++ /dev/null
@@ -1,54 +0,0 @@
-<?xml version="1.0"?>
-<!-- Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>parent</artifactId>
- <version>7-SNAPSHOT</version>
- <relativePath>../parent/pom.xml</relativePath>
- </parent>
- <artifactId>jdisc_messagebus_service</artifactId>
- <version>7-SNAPSHOT</version>
- <packaging>container-plugin</packaging>
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>jdisc_core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>component</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>messagebus</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>bundle-plugin</artifactId>
- <extensions>true</extensions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java
deleted file mode 100644
index c64fea8653b..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/IgnoredCompletionHandler.java
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.handler.CompletionHandler;
-
-/**
- * @author Simon Thoresen Hult
- */
-enum IgnoredCompletionHandler implements CompletionHandler {
-
- INSTANCE;
-
- @Override
- public void completed() {
-
- }
-
- @Override
- public void failed(final Throwable t) {
-
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
deleted file mode 100644
index 922e4140868..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusClient.java
+++ /dev/null
@@ -1,147 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.google.inject.Inject;
-import com.yahoo.jdisc.AbstractResource;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.ResourceReference;
-import com.yahoo.jdisc.handler.ContentChannel;
-import com.yahoo.jdisc.handler.RequestDeniedException;
-import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.jdisc.service.ClientProvider;
-import java.util.logging.Level;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.shared.ClientSession;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Logger;
-
-/**
- * @author Simon Thoresen Hult
- */
-public final class MbusClient extends AbstractResource implements ClientProvider, ReplyHandler {
-
- private static final Logger log = Logger.getLogger(MbusClient.class.getName());
- private final BlockingQueue<MbusRequest> queue = new LinkedBlockingQueue<>();
- private final ClientSession session;
- private final Thread thread = new Thread(new SenderTask(), "MbusClient");
- private volatile boolean done = false;
- private final ResourceReference sessionReference;
-
- @Inject
- public MbusClient(ClientSession session) {
- this.session = session;
- this.sessionReference = session.refer();
- }
-
- @Override
- public void start() {
- thread.start();
- }
-
- @Override
- public ContentChannel handleRequest(Request request, ResponseHandler handler) {
- if (!(request instanceof MbusRequest)) {
- throw new RequestDeniedException(request);
- }
- final Message msg = ((MbusRequest)request).getMessage();
- msg.getTrace().trace(6, "Request received by MbusClient.");
- msg.pushHandler(null); // save user context
- final Long timeout = request.timeRemaining(TimeUnit.MILLISECONDS);
- if (timeout != null) {
- msg.setTimeReceivedNow();
- msg.setTimeRemaining(Math.max(1, timeout)); // negative or zero timeout has semantics
- }
- msg.setContext(handler);
- msg.pushHandler(this);
- sendBlocking((MbusRequest)request);
- return null;
- }
-
- @Override
- public void handleTimeout(Request request, final ResponseHandler handler) {
- // ignore, mbus has guaranteed reply
- }
-
- @Override
- protected void destroy() {
- log.log(Level.FINE, "Destroying message bus client.");
- sessionReference.close();
- done = true;
- }
-
- @Override
- public void handleReply(final Reply reply) {
- reply.getTrace().trace(6, "Reply received by MbusClient.");
- final ResponseHandler handler = (ResponseHandler)reply.getContext();
- reply.popHandler(); // restore user context
- try {
- handler.handleResponse(new MbusResponse(StatusCodes.fromMbusReply(reply), reply))
- .close(IgnoredCompletionHandler.INSTANCE);
- } catch (final Exception e) {
- log.log(Level.WARNING, "Ignoring exception thrown by ResponseHandler.", e);
- }
- }
-
- private void sendBlocking(MbusRequest request) {
- while (!sendMessage(request)) {
- try {
- Thread.sleep(5);
- } catch (final InterruptedException e) {
- // ignore
- }
- }
- }
-
- private boolean sendMessage(MbusRequest request) {
- Error error;
- final Long millis = request.timeRemaining(TimeUnit.MILLISECONDS);
- if (millis != null && millis <= 0) {
- error = new Error(ErrorCode.TIMEOUT, request.getTimeout(TimeUnit.MILLISECONDS) + " millis");
- } else if (request.isCancelled()) {
- error = new Error(ErrorCode.APP_FATAL_ERROR, "request cancelled");
- } else {
- try {
- error = session.sendMessage(request.getMessage()).getError();
- } catch (final Exception e) {
- error = new Error(ErrorCode.FATAL_ERROR, e.toString());
- }
- }
- if (error == null) {
- return true;
- }
- if (error.isFatal()) {
- final Reply reply = new EmptyReply();
- reply.swapState(request.getMessage());
- reply.addError(error);
- reply.popHandler().handleReply(reply);
- return true;
- }
- return false;
- }
-
- private class SenderTask implements Runnable {
-
- @Override
- public void run() {
- while (!done) {
- try {
- final MbusRequest request = queue.poll(100, TimeUnit.MILLISECONDS);
- if (request == null) {
- continue;
- }
- sendBlocking(request);
- } catch (final Exception e) {
- log.log(Level.WARNING, "Ignoring exception thrown by MbusClient.", e);
- }
- }
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java
deleted file mode 100644
index a0bedd678eb..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.service.CurrentContainer;
-import com.yahoo.messagebus.Message;
-
-import java.net.URI;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MbusRequest extends Request {
-
- private final Message message;
-
- public MbusRequest(CurrentContainer current, URI uri, Message msg) {
- super(current, uri);
- this.message = validateMessage(msg);
- }
-
- public MbusRequest(Request parent, URI uri, Message msg) {
- super(parent, uri);
- this.message = validateMessage(msg);
- }
-
- public Message getMessage() {
- return message;
- }
-
- private Message validateMessage(Message msg) {
- if (msg != null) {
- return msg;
- }
- release();
- throw new NullPointerException();
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java
deleted file mode 100644
index fb5657a9215..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusRequestHandler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.handler.AbstractRequestHandler;
-import com.yahoo.jdisc.handler.CompletionHandler;
-import com.yahoo.jdisc.handler.ContentChannel;
-import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-
-/**
- * @author Simon Thoresen Hult
- */
-public abstract class MbusRequestHandler extends AbstractRequestHandler implements MessageHandler {
-
- @Override
- public ContentChannel handleRequest(final Request request, final ResponseHandler handler) {
- if (!(request instanceof MbusRequest)) {
- throw new UnsupportedOperationException("Expected MbusRequest, got " + request.getClass().getName() + ".");
- }
- final Message msg = ((MbusRequest)request).getMessage();
- msg.pushHandler(new RespondingReplyHandler(handler));
- handleMessage(msg);
- return null;
- }
-
- private static class RespondingReplyHandler implements ReplyHandler {
-
- private final ResponseHandler handler;
-
- RespondingReplyHandler(final ResponseHandler handler) {
- this.handler = handler;
- }
-
- @Override
- public void handleReply(final Reply reply) {
- final MbusResponse response = new MbusResponse(StatusCodes.fromMbusReply(reply), reply);
- handler.handleResponse(response).close(IgnoringCompletionHandler.INSTANCE);
- }
- }
-
- private static class IgnoringCompletionHandler implements CompletionHandler {
-
- public static final IgnoringCompletionHandler INSTANCE = new IgnoringCompletionHandler();
-
- @Override
- public void completed() {
-
- }
-
- @Override
- public void failed(final Throwable t) {
-
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java
deleted file mode 100644
index 37da4d8569f..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusResponse.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.Response;
-import com.yahoo.messagebus.Reply;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MbusResponse extends Response {
-
- private final Reply reply;
-
- public MbusResponse(int status, Reply reply) {
- super(status);
- if (reply == null) {
- throw new NullPointerException();
- }
- this.reply = reply;
- }
-
- public Reply getReply() {
- return reply;
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
deleted file mode 100644
index e26e1e7e134..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/MbusServer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.google.inject.Inject;
-import com.yahoo.jdisc.AbstractResource;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.ResourceReference;
-import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.handler.ContentChannel;
-import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.jdisc.service.CurrentContainer;
-import com.yahoo.jdisc.service.ServerProvider;
-import java.util.logging.Level;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.shared.ServerSession;
-
-import java.net.URI;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Logger;
-
-/**
- * @author Simon Thoresen Hult
- */
-public final class MbusServer extends AbstractResource implements ServerProvider, MessageHandler {
-
- private final static Logger log = Logger.getLogger(MbusServer.class.getName());
- private final AtomicBoolean running = new AtomicBoolean(false);
- private final CurrentContainer container;
- private final ServerSession session;
- private final URI uri;
- private final ResourceReference sessionReference;
-
- @Inject
- public MbusServer(CurrentContainer container, ServerSession session) {
- this.container = container;
- this.session = session;
- uri = URI.create("mbus://localhost/" + session.name());
- session.setMessageHandler(this);
- sessionReference = session.refer();
- }
-
- @Override
- public void start() {
- log.log(Level.FINE, "Starting message bus server.");
- running.set(true);
- }
-
- @Override
- public void close() {
- log.log(Level.FINE, "Closing message bus server.");
- running.set(false);
- }
-
- @Override
- protected void destroy() {
- log.log(Level.FINE, "Destroying message bus server.");
- running.set(false);
- sessionReference.close();
- }
-
- @Override
- public void handleMessage(Message msg) {
- if (!running.get()) {
- dispatchErrorReply(msg, ErrorCode.SESSION_BUSY, "Session temporarily closed.");
- return;
- }
- if (msg.getTrace().shouldTrace(6)) {
- msg.getTrace().trace(6, "Message received by MbusServer.");
- }
- Request request = null;
- ContentChannel content = null;
- try {
- request = new MbusRequest(container, uri, msg);
- content = request.connect(new ServerResponseHandler(msg));
- } catch (RuntimeException e) {
- dispatchErrorReply(msg, ErrorCode.APP_FATAL_ERROR, e.toString());
- } finally {
- if (request != null) {
- request.release();
- }
- }
- if (content != null) {
- content.close(IgnoredCompletionHandler.INSTANCE);
- }
- }
-
- public String connectionSpec() {
- return session.connectionSpec();
- }
-
- private void dispatchErrorReply(Message msg, int errCode, String errMsg) {
- Reply reply = new EmptyReply();
- reply.swapState(msg);
- reply.addError(new Error(errCode, errMsg));
- session.sendReply(reply);
- }
-
- private class ServerResponseHandler implements ResponseHandler {
-
- final Message msg;
-
- ServerResponseHandler(Message msg) {
- this.msg = msg;
- }
-
- @Override
- public ContentChannel handleResponse(Response response) {
- Reply reply;
- if (response instanceof MbusResponse) {
- reply = ((MbusResponse)response).getReply();
- } else {
- reply = new EmptyReply();
- reply.swapState(msg);
- }
- Error err = StatusCodes.toMbusError(response.getStatus());
- if (err != null) {
- if (err.isFatal()) {
- if (!reply.hasFatalErrors()) {
- reply.addError(err);
- }
- } else {
- if (!reply.hasErrors()) {
- reply.addError(err);
- }
- }
- }
- if (reply.getTrace().shouldTrace(6)) {
- reply.getTrace().trace(6, "Sending reply from MbusServer.");
- }
- session.sendReply(reply);
- return null;
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
deleted file mode 100644
index 6570c910af3..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
+++ /dev/null
@@ -1,77 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.Response;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Reply;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class StatusCodes {
-
- public static int fromMbusReply(final Reply reply) {
- int statusCode = Response.Status.OK;
- for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
- statusCode = Math.max(statusCode, fromMbusError(reply.getError(i)));
- }
- return statusCode;
- }
-
- public static int fromMbusError(final Error error) {
- final int errorCode = error.getCode();
- if (errorCode < ErrorCode.TRANSIENT_ERROR) {
- return Response.Status.OK;
- }
- if (errorCode < ErrorCode.FATAL_ERROR) {
- return Response.Status.TEMPORARY_REDIRECT;
- }
- switch (errorCode) {
- case ErrorCode.SEND_QUEUE_CLOSED:
- return Response.Status.LOCKED;
- case ErrorCode.ILLEGAL_ROUTE:
- return Response.Status.BAD_REQUEST;
- case ErrorCode.NO_SERVICES_FOR_ROUTE:
- return Response.Status.NOT_FOUND;
- case ErrorCode.ENCODE_ERROR:
- return Response.Status.BAD_REQUEST;
- case ErrorCode.NETWORK_ERROR:
- return Response.Status.BAD_REQUEST; // got nothing better
- case ErrorCode.UNKNOWN_PROTOCOL:
- return Response.Status.UNSUPPORTED_MEDIA_TYPE;
- case ErrorCode.DECODE_ERROR:
- return Response.Status.UNSUPPORTED_MEDIA_TYPE;
- case ErrorCode.TIMEOUT:
- return Response.Status.REQUEST_TIMEOUT;
- case ErrorCode.INCOMPATIBLE_VERSION:
- return Response.Status.VERSION_NOT_SUPPORTED;
- case ErrorCode.UNKNOWN_POLICY:
- return Response.Status.BAD_REQUEST;
- case ErrorCode.NETWORK_SHUTDOWN:
- return Response.Status.LOCKED;
- case ErrorCode.POLICY_ERROR:
- return Response.Status.PRECONDITION_FAILED;
- case ErrorCode.SEQUENCE_ERROR:
- return Response.Status.PRECONDITION_FAILED;
- case ErrorCode.APP_FATAL_ERROR:
- return Response.Status.INTERNAL_SERVER_ERROR;
- default:
- return Response.Status.INTERNAL_SERVER_ERROR;
- }
- }
-
- public static Error toMbusError(final int statusCode) {
- if (statusCode < 300) {
- return null;
- } else if (statusCode < 400) {
- return new Error(ErrorCode.APP_TRANSIENT_ERROR, statusCode + " Redirection");
- } else if (statusCode < 500) {
- return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Client Error");
- } else if (statusCode < 600) {
- return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Server Error");
- } else {
- return new Error(ErrorCode.APP_FATAL_ERROR, statusCode + " Unknown Error");
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java
deleted file mode 100644
index 9aea8cf7db8..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-@ExportPackage
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
deleted file mode 100644
index 111805d61b0..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ClientTestDriver.java
+++ /dev/null
@@ -1,134 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.yahoo.jdisc.References;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.ResourceReference;
-import com.yahoo.jdisc.application.ContainerBuilder;
-import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.jdisc.test.TestDriver;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.jdisc.MbusClient;
-import com.yahoo.messagebus.jdisc.MbusRequest;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.messagebus.test.SimpleProtocol;
-
-import java.net.URI;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class ClientTestDriver {
-
- private final RemoteServer server;
- private final MbusClient client;
- private final SharedSourceSession session;
- private final TestDriver driver;
-
- private ClientTestDriver(RemoteServer server, Protocol protocol) {
- this.server = server;
-
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(server.slobrokId());
- SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
- session = mbus.newSourceSession(new SourceSessionParams());
- client = new MbusClient(session);
- client.start();
- mbus.release();
-
- driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
- ContainerBuilder builder = driver.newContainerBuilder();
- builder.clientBindings().bind("mbus://*/*", client);
- driver.activateContainer(builder);
- }
-
- public SourceSession sourceSession() {
- return session.session();
- }
-
- public Request newServerRequest() {
- return new Request(driver, URI.create("mbus://localhost/"));
- }
-
- public Request newClientRequest(Message msg) {
- msg.setRoute(Route.parse(server.connectionSpec()));
- if (msg.getTrace().getLevel() == 0) {
- msg.getTrace().setLevel(9);
- }
- final Request parent = newServerRequest();
- try (final ResourceReference ref = References.fromResource(parent)) {
- return new MbusRequest(parent, URI.create("mbus://remotehost/"), msg);
- }
- }
-
- public boolean sendRequest(Request request, ResponseHandler responseHandler) {
- request.connect(responseHandler).close(null);
- return true;
- }
-
- public boolean sendMessage(Message msg, ResponseHandler responseHandler) {
- final Request request = newClientRequest(msg);
- try (final ResourceReference ref = References.fromResource(request)) {
- return sendRequest(request, responseHandler);
- }
- }
-
- public Message awaitMessage() {
- Message msg = null;
- try {
- msg = server.awaitMessage(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (msg != null) {
- msg.getTrace().trace(0, "Message received by RemoteServer.");
- }
- return msg;
- }
-
- public void sendReply(Reply reply) {
- reply.getTrace().trace(0, "Sending reply from RemoteServer.");
- server.sendReply(reply);
- }
-
- public boolean awaitMessageAndSendReply(Reply reply) {
- Message msg = awaitMessage();
- if (msg == null) {
- return false;
- }
- reply.swapState(msg);
- sendReply(reply);
- return true;
- }
-
- public boolean close() {
- session.release();
- client.release();
- server.close();
- return driver.close();
- }
-
- public MbusClient client() {
- return client;
- }
-
- public RemoteServer server() {
- return server;
- }
-
- public static ClientTestDriver newInstance() {
- return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), new SimpleProtocol());
- }
-
- public static ClientTestDriver newInstanceWithProtocol(Protocol protocol) {
- return new ClientTestDriver(RemoteServer.newInstanceWithInternSlobrok(), protocol);
- }
-
- public static ClientTestDriver newInstanceWithExternSlobrok(String slobrokId) {
- return new ClientTestDriver(RemoteServer.newInstanceWithExternSlobrok(slobrokId), new SimpleProtocol());
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
deleted file mode 100644
index c5287165e27..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/MessageQueue.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageHandler;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MessageQueue implements MessageHandler {
-
- private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
-
- @Override
- public void handleMessage(Message msg) {
- queue.add(msg);
- }
-
- public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
- return queue.poll(timeout, unit);
- }
-
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
deleted file mode 100644
index 57d0abd980b..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteClient.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.network.local.LocalNetwork;
-import com.yahoo.messagebus.network.rpc.RPCNetwork;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.test.SimpleProtocol;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class RemoteClient {
-
- private final Slobrok slobrok;
- private final String slobrokId;
- private final MessageBus mbus;
- private final ReplyQueue queue = new ReplyQueue();
- private final SourceSession session;
-
- private RemoteClient(Slobrok slobrok, String slobrokId, Protocol protocol, boolean network) {
- this.slobrok = slobrok;
- this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
- mbus = network
- ? new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(this.slobrokId)),
- new MessageBusParams().addProtocol(protocol))
- : new MessageBus(new LocalNetwork(), new MessageBusParams().addProtocol(protocol));
- session = mbus.createSourceSession(new SourceSessionParams().setThrottlePolicy(null).setReplyHandler(queue));
- }
-
- public Result sendMessage(Message msg) {
- return session.send(msg);
- }
-
- public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
- return queue.awaitReply(timeout, unit);
- }
-
- public String slobrokId() {
- return slobrokId;
- }
-
- public void close() {
- session.destroy();
- mbus.destroy();
- if (slobrok != null) {
- slobrok.stop();
- }
- }
-
- public static RemoteClient newInstanceWithInternSlobrok(boolean network) {
- return new RemoteClient(newSlobrok(), null, new SimpleProtocol(), network);
- }
-
- public static RemoteClient newInstanceWithExternSlobrok(String slobrokId, boolean network) {
- return new RemoteClient(null, slobrokId, new SimpleProtocol(), network);
- }
-
- public static RemoteClient newInstanceWithProtocolAndInternSlobrok(Protocol protocol, boolean network) {
- return new RemoteClient(newSlobrok(), null, protocol, network);
- }
-
- private static Slobrok newSlobrok() {
- Slobrok slobrok;
- try {
- slobrok = new Slobrok();
- } catch (ListenFailedException e) {
- throw new IllegalStateException(e);
- }
- return slobrok;
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
deleted file mode 100644
index 1f0f82c4903..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/RemoteServer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetwork;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.test.SimpleProtocol;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class RemoteServer {
-
- private final Slobrok slobrok;
- private final String slobrokId;
- private final MessageBus mbus;
- private final MessageQueue queue = new MessageQueue();
- private final DestinationSession session;
-
- private RemoteServer(Slobrok slobrok, String slobrokId, Protocol protocol, String identity) {
- this.slobrok = slobrok;
- this.slobrokId = slobrok != null ? slobrok.configId() : slobrokId;
- mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
- .setSlobrokConfigId(this.slobrokId)
- .setIdentity(new Identity(identity))),
- new MessageBusParams().addProtocol(protocol));
- session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(queue));
- }
-
- public String connectionSpec() {
- return session.getConnectionSpec();
- }
-
- public Message awaitMessage(int timeout, TimeUnit unit) throws InterruptedException {
- return queue.awaitMessage(timeout, unit);
- }
-
- public void ackMessage(Message msg) {
- session.acknowledge(msg);
- }
-
- public void sendReply(Reply reply) {
- session.reply(reply);
- }
-
- public String slobrokId() {
- return slobrokId;
- }
-
- public void close() {
- session.destroy();
- mbus.destroy();
- if (slobrok != null) {
- slobrok.stop();
- }
- }
-
- public static RemoteServer newInstanceWithInternSlobrok() {
- return new RemoteServer(newSlobrok(), null, new SimpleProtocol(), "remote");
- }
-
- public static RemoteServer newInstanceWithExternSlobrok(String slobrokId) {
- return new RemoteServer(null, slobrokId, new SimpleProtocol(), "remote");
- }
-
- public static RemoteServer newInstance(String slobrokId, String identity, Protocol protocol) {
- return new RemoteServer(null, slobrokId, protocol, identity);
- }
-
- public static RemoteServer newInstanceWithProtocol(Protocol protocol) {
- return new RemoteServer(newSlobrok(), null, protocol, "remote");
- }
-
- private static Slobrok newSlobrok() {
- try {
- return new Slobrok();
- } catch (ListenFailedException e) {
- throw new IllegalStateException(e);
- }
- }
-
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
deleted file mode 100644
index 6c48aab5a7f..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ReplyQueue.java
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class ReplyQueue implements ReplyHandler {
-
- private final BlockingQueue<Reply> queue = new LinkedBlockingQueue<>();
-
- @Override
- public void handleReply(Reply reply) {
- queue.add(reply);
- }
-
- public Reply awaitReply(int timeout, TimeUnit unit) throws InterruptedException {
- return queue.poll(timeout, unit);
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
deleted file mode 100644
index e59db28e886..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/test/ServerTestDriver.java
+++ /dev/null
@@ -1,155 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.google.inject.Module;
-import com.yahoo.jdisc.application.ContainerBuilder;
-import com.yahoo.jdisc.handler.RequestHandler;
-import com.yahoo.jdisc.test.TestDriver;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.jdisc.MbusServer;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.shared.ServerSession;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.test.SimpleProtocol;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class ServerTestDriver {
-
- private final RemoteClient client;
- private final MbusServer server;
- private final TestDriver driver;
-
- private ServerTestDriver(RemoteClient client, boolean activateContainer, RequestHandler requestHandler,
- Protocol protocol, Module... guiceModules)
- {
- this.client = client;
- driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(guiceModules);
- if (activateContainer) {
- ContainerBuilder builder = driver.newContainerBuilder();
- if (requestHandler != null) {
- builder.serverBindings().bind("mbus://*/*", requestHandler);
- }
- driver.activateContainer(builder);
- }
-
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(protocol);
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(client.slobrokId());
- SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
- ServerSession session = mbus.newDestinationSession(new DestinationSessionParams());
- server = new MbusServer(driver, session);
- server.start();
- session.release();
- mbus.release();
- }
-
- public boolean sendMessage(Message msg) {
- msg.setRoute(Route.parse(server.connectionSpec()));
- msg.getTrace().setLevel(9);
- return client.sendMessage(msg).isAccepted();
- }
-
- public Reply awaitReply() {
- Reply reply = null;
- try {
- reply = client.awaitReply(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- if (reply != null) {
- System.out.println(reply.getTrace());
- }
- return reply;
- }
-
- public Reply awaitSuccess() {
- Reply reply = awaitReply();
- if (reply == null || reply.hasErrors()) {
- return null;
- }
- return reply;
- }
-
- public Reply awaitErrors(Integer... errCodes) {
- Reply reply = awaitReply();
- if (reply == null) {
- return null;
- }
- List<Integer> lst = new LinkedList<>(Arrays.asList(errCodes));
- for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
- Error err = reply.getError(i);
- System.out.println(err);
- int idx = lst.indexOf(err.getCode());
- if (idx < 0) {
- return null;
- }
- lst.remove(idx);
- }
- if (!lst.isEmpty()) {
- return null;
- }
- return reply;
- }
-
- public boolean close() {
- server.close();
- server.release();
- client.close();
- return driver.close();
- }
-
- public TestDriver parent() {
- return driver;
- }
-
- public RemoteClient client() {
- return client;
- }
-
- public MbusServer server() {
- return server;
- }
-
- public static ServerTestDriver newInstance(RequestHandler requestHandler, boolean network, Module... guiceModules) {
- return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler,
- new SimpleProtocol(), guiceModules);
- }
-
- public static ServerTestDriver newInstanceWithProtocol(Protocol protocol, RequestHandler requestHandler,
- boolean network, Module... guiceModules)
- {
- return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, requestHandler, protocol,
- guiceModules);
- }
-
- public static ServerTestDriver newInstanceWithExternSlobrok(String slobrokId, RequestHandler requestHandler,
- boolean network, Module... guiceModules)
- {
- return new ServerTestDriver(RemoteClient.newInstanceWithExternSlobrok(slobrokId, network),
- true, requestHandler, new SimpleProtocol(), guiceModules);
- }
-
- public static ServerTestDriver newInactiveInstance(boolean network, Module... guiceModules) {
- return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), false, null,
- new SimpleProtocol(), guiceModules);
- }
-
- public static ServerTestDriver newInactiveInstanceWithProtocol(Protocol protocol, boolean network, Module... guiceModules) {
- return new ServerTestDriver(RemoteClient.newInstanceWithProtocolAndInternSlobrok(protocol, network), false, null,
- protocol, guiceModules);
- }
-
- public static ServerTestDriver newUnboundInstance(boolean network, Module... guiceModules) {
- return new ServerTestDriver(RemoteClient.newInstanceWithInternSlobrok(network), true, null,
- new SimpleProtocol(), guiceModules);
- }
-
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java
deleted file mode 100644
index 72f563e8bd7..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-@ExportPackage
-package com.yahoo.messagebus.network;
-
-import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java
deleted file mode 100644
index 7b468813713..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/network/rpc/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-@ExportPackage
-package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java
deleted file mode 100644
index 63b713e70e0..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-@ExportPackage
-package com.yahoo.messagebus;
-
-import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java
deleted file mode 100644
index ba8fc5fafba..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/routing/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-@ExportPackage
-package com.yahoo.messagebus.routing;
-
-import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
deleted file mode 100644
index 0964a254cf2..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ClientSession.java
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jdisc.SharedResource;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Result;
-
-/**
- * @author Simon Thoresen Hult
- */
-public interface ClientSession extends SharedResource {
-
- public Result sendMessage(Message msg);
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
deleted file mode 100644
index ad58d6b9a5e..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/NullNetwork.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jrt.slobrok.api.IMirror;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.network.Network;
-import com.yahoo.messagebus.network.NetworkOwner;
-import com.yahoo.messagebus.routing.RoutingNode;
-
-import java.util.List;
-
-/**
- * <p>Used by SharedMessageBus as a network when the container runs in LocalApplication with no network services.</p>
- *
- * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a>
- */
-class NullNetwork implements Network {
-
- @Override
- public boolean waitUntilReady(double seconds) {
- return true;
- }
-
- @Override
- public void attach(NetworkOwner owner) {
-
- }
-
- @Override
- public void registerSession(String session) {
-
- }
-
- @Override
- public void unregisterSession(String session) {
-
- }
-
- @Override
- public boolean allocServiceAddress(RoutingNode recipient) {
- return false;
- }
-
- @Override
- public void freeServiceAddress(RoutingNode recipient) {
-
- }
-
- @Override
- public void send(Message msg, List<RoutingNode> recipients) {
-
- }
-
- @Override
- public void sync() {
-
- }
-
- @Override
- public void shutdown() {
-
- }
-
- @Override
- public String getConnectionSpec() {
- return null;
- }
-
- @Override
- public IMirror getMirror() {
- return null;
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
deleted file mode 100644
index 56713815c7a..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/ServerSession.java
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jdisc.SharedResource;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.Reply;
-
-/**
- * @author Simon Thoresen Hult
- */
-public interface ServerSession extends SharedResource {
-
- public MessageHandler getMessageHandler();
-
- public void setMessageHandler(MessageHandler msgHandler);
-
- public void sendReply(Reply reply);
-
- public String connectionSpec();
-
- public String name();
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
deleted file mode 100644
index 7da164757cd..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedDestinationSession.java
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jdisc.AbstractResource;
-import com.yahoo.jdisc.ResourceReference;
-import java.util.logging.Level;
-import com.yahoo.messagebus.DestinationSession;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.Reply;
-
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class SharedDestinationSession extends AbstractResource implements MessageHandler, ServerSession {
-
- private static final Logger log = Logger.getLogger(SharedDestinationSession.class.getName());
- private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>();
- private final DestinationSession session;
- private final ResourceReference mbusReference;
-
- SharedDestinationSession(SharedMessageBus mbus, DestinationSessionParams params) {
- this.msgHandler.set(params.getMessageHandler());
- this.session = mbus.messageBus().createDestinationSession(params.setMessageHandler(this));
- this.mbusReference = mbus.refer();
- }
-
- public DestinationSession session() {
- return session;
- }
-
- @Override
- public void sendReply(Reply reply) {
- session.reply(reply);
- }
-
- @Override
- public MessageHandler getMessageHandler() {
- return msgHandler.get();
- }
-
- @Override
- public void setMessageHandler(MessageHandler msgHandler) {
- if (!this.msgHandler.compareAndSet(null, msgHandler)) {
- throw new IllegalStateException("Message handler already registered.");
- }
- }
-
- @Override
- public void handleMessage(Message msg) {
- MessageHandler msgHandler = this.msgHandler.get();
- if (msgHandler == null) {
- Reply reply = new EmptyReply();
- reply.swapState(msg);
- reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet."));
- sendReply(reply);
- return;
- }
- msgHandler.handleMessage(msg);
- }
-
- @Override
- public String connectionSpec() {
- return session.getConnectionSpec();
- }
-
- @Override
- public String name() {
- return session.getName();
- }
-
- @Override
- protected void destroy() {
- log.log(Level.FINE, "Destroying shared destination session.");
- session.destroy();
- mbusReference.close();
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
deleted file mode 100644
index 5c9fab46e34..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedIntermediateSession.java
+++ /dev/null
@@ -1,104 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jdisc.AbstractResource;
-import com.yahoo.jdisc.ResourceReference;
-import java.util.logging.Level;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.IntermediateSession;
-import com.yahoo.messagebus.IntermediateSessionParams;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.Result;
-
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class SharedIntermediateSession extends AbstractResource
- implements ClientSession, ServerSession, MessageHandler, ReplyHandler
-{
-
- private static final Logger log = Logger.getLogger(SharedIntermediateSession.class.getName());
- private final AtomicReference<MessageHandler> msgHandler = new AtomicReference<>();
- private final IntermediateSession session;
- private final ResourceReference mbusReference;
-
- public SharedIntermediateSession(SharedMessageBus mbus, IntermediateSessionParams params) {
- if (params.getReplyHandler() != null) {
- throw new IllegalArgumentException("Reply handler must be null.");
- }
- this.msgHandler.set(params.getMessageHandler());
- this.session = mbus.messageBus().createIntermediateSession(params.setReplyHandler(this)
- .setMessageHandler(this));
- this.mbusReference = mbus.refer();
- }
-
- public IntermediateSession session() {
- return session;
- }
-
- @Override
- public Result sendMessage(Message msg) {
- session.forward(msg);
- return Result.ACCEPTED;
- }
-
- @Override
- public void sendReply(Reply reply) {
- session.forward(reply);
- }
-
- @Override
- public MessageHandler getMessageHandler() {
- return msgHandler.get();
- }
-
- @Override
- public void setMessageHandler(MessageHandler msgHandler) {
- if (!this.msgHandler.compareAndSet(null, msgHandler)) {
- throw new IllegalStateException("Message handler already registered.");
- }
- }
-
- @Override
- public void handleMessage(Message msg) {
- MessageHandler msgHandler = this.msgHandler.get();
- if (msgHandler == null) {
- Reply reply = new EmptyReply();
- reply.swapState(msg);
- reply.addError(new Error(ErrorCode.SESSION_BUSY, "Session not fully configured yet."));
- sendReply(reply);
- return;
- }
- msgHandler.handleMessage(msg);
- }
-
- @Override
- public void handleReply(Reply reply) {
- reply.popHandler().handleReply(reply);
- }
-
- @Override
- public String connectionSpec() {
- return session.getConnectionSpec();
- }
-
- @Override
- public String name() {
- return session.getName();
- }
-
- @Override
- protected void destroy() {
- log.log(Level.FINE, "Destroying shared intermediate session.");
- session.destroy();
- mbusReference.close();
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
deleted file mode 100644
index dd135a51378..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedMessageBus.java
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.config.subscription.ConfigGetter;
-import com.yahoo.jdisc.AbstractResource;
-import java.util.logging.Level;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.IntermediateSessionParams;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.SourceSessionParams;
-import com.yahoo.messagebus.network.Network;
-import com.yahoo.messagebus.network.rpc.RPCNetwork;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.cloud.config.SlobroksConfig;
-
-import java.util.logging.Logger;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class SharedMessageBus extends AbstractResource {
-
- private static final Logger log = Logger.getLogger(SharedMessageBus.class.getName());
- private final MessageBus mbus;
-
- public SharedMessageBus(MessageBus mbus) {
- mbus.getClass(); // throws NullPointerException
- this.mbus = mbus;
- }
-
- public MessageBus messageBus() {
- return mbus;
- }
-
- @Override
- protected void destroy() {
- log.log(Level.FINE, "Destroying shared message bus.");
- mbus.destroy();
- }
-
- public SharedSourceSession newSourceSession(SourceSessionParams params) {
- return new SharedSourceSession(this, params);
- }
-
- public SharedIntermediateSession newIntermediateSession(IntermediateSessionParams params) {
- return new SharedIntermediateSession(this, params);
- }
-
- public SharedDestinationSession newDestinationSession(DestinationSessionParams params) {
- return new SharedDestinationSession(this, params);
- }
-
- public static SharedMessageBus newInstance(MessageBusParams mbusParams, RPCNetworkParams netParams) {
- return new SharedMessageBus(new MessageBus(newNetwork(netParams), mbusParams));
- }
-
- private static Network newNetwork(RPCNetworkParams params) {
- SlobroksConfig cfg = params.getSlobroksConfig();
- if (cfg == null) {
- cfg = ConfigGetter.getConfig(SlobroksConfig.class, params.getSlobrokConfigId());
- }
- if (cfg.slobrok().isEmpty()) {
- return new NullNetwork(); // for LocalApplication
- }
- return new RPCNetwork(params);
- }
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
deleted file mode 100644
index 56071682349..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/SharedSourceSession.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jdisc.AbstractResource;
-import com.yahoo.jdisc.ResourceReference;
-import java.util.logging.Level;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.Result;
-import com.yahoo.messagebus.SourceSession;
-import com.yahoo.messagebus.SourceSessionParams;
-
-import java.util.logging.Logger;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class SharedSourceSession extends AbstractResource implements ClientSession, ReplyHandler {
-
- private static final Logger log = Logger.getLogger(SharedSourceSession.class.getName());
- private final SourceSession session;
- private final ResourceReference mbusReference;
-
- public SharedSourceSession(SharedMessageBus mbus, SourceSessionParams params) {
- if (params.getReplyHandler() != null) {
- throw new IllegalArgumentException("Reply handler must be null.");
- }
- this.session = mbus.messageBus().createSourceSession(params.setReplyHandler(this));
- this.mbusReference = mbus.refer();
- }
-
- public SourceSession session() {
- return session;
- }
-
- @Override
- public Result sendMessage(Message msg) {
- return session.send(msg);
- }
-
- public Result sendMessageBlocking(Message msg) throws InterruptedException {
- return session.sendBlocking(msg);
- }
-
- @Override
- public void handleReply(Reply reply) {
- reply.popHandler().handleReply(reply);
- }
-
- @Override
- protected void destroy() {
- log.log(Level.FINE, "Destroying shared source session.");
- session.close();
- mbusReference.close();
- }
-
-}
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java
deleted file mode 100644
index 941a0dc4c5c..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/shared/package-info.java
+++ /dev/null
@@ -1,8 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * Not a public API, exported for use in internal components.
- */
-@ExportPackage
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java
deleted file mode 100644
index 42bc03b6e17..00000000000
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/test/package-info.java
+++ /dev/null
@@ -1,5 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-@ExportPackage
-package com.yahoo.messagebus.test;
-
-import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java
deleted file mode 100644
index 62a9a864781..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java
+++ /dev/null
@@ -1,149 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.application.ContainerBuilder;
-import com.yahoo.jdisc.handler.FutureResponse;
-import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.jdisc.test.TestDriver;
-import com.yahoo.messagebus.DestinationSession;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.MessageHandler;
-import com.yahoo.messagebus.SourceSessionParams;
-import com.yahoo.messagebus.network.local.LocalNetwork;
-import com.yahoo.messagebus.network.local.LocalWire;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class ClientThreadingTestCase {
-
- private static final int NUM_THREADS = 32;
- private static final int NUM_REQUESTS = 1000;
-
- @Test
- @Ignore
- public void requireThatClientIsThreadSafe() throws Exception {
- final LocalWire wire = new LocalWire();
- final Client client = new Client(wire);
- final Server server = new Server(wire);
-
- final List<Callable<Boolean>> lst = new LinkedList<>();
- final Route route = Route.parse(server.session.getConnectionSpec());
- for (int i = 0; i < NUM_THREADS; ++i) {
- lst.add(new RequestTask(client, route));
- }
- final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
- for (final Future<Boolean> res : executor.invokeAll(lst, 60, TimeUnit.SECONDS)) {
- assertThat(res.get(), is(true));
- }
-
- assertThat(client.close(), is(true));
- assertThat(server.close(), is(true));
- }
-
- private static final class RequestTask implements Callable<Boolean> {
-
- final Client client;
- final Route route;
-
- RequestTask(final Client client, final Route route) {
- this.client = client;
- this.route = route;
- }
-
- @Override
- public Boolean call() throws Exception {
- for (int i = 0; i < NUM_REQUESTS; ++i) {
- final FutureResponse responseHandler = new FutureResponse();
- client.send(new SimpleMessage("foo").setRoute(route), responseHandler);
- responseHandler.get(60, TimeUnit.SECONDS);
- }
- return true;
- }
- }
-
- private static class Client {
-
- final MbusClient delegate;
- final TestDriver driver;
-
- Client(final LocalWire wire) {
- driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
- delegate = newMbusClient(wire);
-
- final ContainerBuilder builder = driver.newContainerBuilder();
- builder.clientBindings().bind("mbus://*/*", delegate);
- driver.activateContainer(builder);
- delegate.start();
- }
-
- void send(final Message msg, final ResponseHandler handler) {
- final MbusRequest request = new MbusRequest(driver, URI.create("mbus://remote/"), msg);
- request.setServerRequest(false);
- request.connect(handler).close(null);
- request.release();
- }
-
- boolean close() {
- delegate.release();
- return driver.close();
- }
- }
-
- private static class Server implements MessageHandler {
-
- final MessageBus mbus;
- final DestinationSession session;
-
- Server(final LocalWire wire) {
- mbus = new MessageBus(
- new LocalNetwork(wire),
- new MessageBusParams().addProtocol(new SimpleProtocol()));
- session = mbus.createDestinationSession(
- new DestinationSessionParams().setMessageHandler(this));
- }
-
- @Override
- public void handleMessage(final Message msg) {
- session.acknowledge(msg);
- }
-
- boolean close() {
- return session.destroy() && mbus.destroy();
- }
- }
-
- private static MbusClient newMbusClient(final LocalWire wire) {
- final SharedMessageBus mbus = new SharedMessageBus(new MessageBus(
- new LocalNetwork(wire),
- new MessageBusParams().addProtocol(new SimpleProtocol())));
- final SharedSourceSession session = mbus.newSourceSession(
- new SourceSessionParams());
- final MbusClient client = new MbusClient(session);
- session.release();
- mbus.release();
- return client;
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java
deleted file mode 100644
index 9cfd1fd02b9..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java
+++ /dev/null
@@ -1,345 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.ResourceReference;
-import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.handler.CompletionHandler;
-import com.yahoo.jdisc.handler.ContentChannel;
-import com.yahoo.jdisc.handler.RequestDeniedException;
-import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.jdisc.test.ClientTestDriver;
-import com.yahoo.messagebus.shared.ClientSession;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleReply;
-import org.junit.Test;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MbusClientTestCase {
-
- @Test
- public void requireThatClientRetainsSession() {
- MySession session = new MySession();
- assertEquals(1, session.refCount);
- MbusClient client = new MbusClient(session);
- assertEquals(2, session.refCount);
- session.release();
- assertEquals(1, session.refCount);
- client.destroy();
- assertEquals(0, session.refCount);
- }
-
- @Test
- public void requireThatRequestResponseWorks() {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
- assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler));
- assertTrue(driver.awaitMessageAndSendReply(new EmptyReply()));
-
- Response response = responseHandler.awaitResponse();
- assertNotNull(response);
- assertEquals(Response.Status.OK, response.getStatus());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatNonMbusRequestIsDenied() throws InterruptedException {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- Request serverReq = null;
- Request clientReq = null;
- try {
- serverReq = driver.newServerRequest();
- clientReq = new Request(serverReq, URI.create("mbus://host/path"));
- clientReq.connect(MyResponseHandler.newInstance());
- fail();
- } catch (RequestDeniedException e) {
- System.out.println(e.getMessage());
- } finally {
- if (serverReq != null) {
- serverReq.release();
- }
- if (clientReq != null) {
- clientReq.release();
- }
- }
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatRequestContentDoesNotSupportWrite() throws InterruptedException {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
-
- Request request = null;
- ContentChannel content;
- try {
- request = driver.newClientRequest(new SimpleMessage("foo"));
- content = request.connect(responseHandler);
- } finally {
- if (request != null) {
- request.release();
- }
- }
- try {
- content.write(ByteBuffer.allocate(69), null);
- fail();
- } catch (UnsupportedOperationException e) {
-
- }
- content.close(null);
-
- assertTrue(driver.awaitMessageAndSendReply(new EmptyReply()));
- assertNotNull(responseHandler.awaitResponse());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatResponseIsMbus() {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
- assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler));
- assertTrue(driver.awaitMessageAndSendReply(new EmptyReply()));
-
- Response response = responseHandler.awaitResponse();
- assertTrue(response instanceof MbusResponse);
- Reply reply = ((MbusResponse)response).getReply();
- assertTrue(reply instanceof EmptyReply);
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatServerReceivesGivenMessage() {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
- assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler));
-
- Message msg = driver.awaitMessage();
- assertTrue(msg instanceof SimpleMessage);
- assertEquals("foo", ((SimpleMessage)msg).getValue());
-
- Reply reply = new EmptyReply();
- reply.swapState(msg);
- driver.sendReply(reply);
-
- assertNotNull(responseHandler.awaitResponse());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatClientReceivesGivenReply() {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
- assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler));
-
- Message msg = driver.awaitMessage(); // TODO: Timing sensitive
- assertNotNull(msg);
- Reply reply = new SimpleReply("bar");
- reply.swapState(msg);
- driver.sendReply(reply);
-
- Response response = responseHandler.awaitResponse();
- assertTrue(response instanceof MbusResponse);
- reply = ((MbusResponse)response).getReply();
- assertTrue(reply instanceof SimpleReply);
- assertEquals("bar", ((SimpleReply)reply).getValue());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatStateIsTransferredToResponse() {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
-
- Message msg = new SimpleMessage("foo");
- Object pushedCtx = new Object();
- msg.setContext(pushedCtx);
- ReplyHandler pushedHandler = new MyReplyHandler();
- msg.pushHandler(pushedHandler);
- Object currentCtx = new Object();
- msg.setContext(currentCtx);
- msg.getTrace().setLevel(6);
- assertTrue(driver.sendMessage(msg, responseHandler));
- assertTrue(driver.awaitMessageAndSendReply(new EmptyReply()));
-
- Response response = responseHandler.awaitResponse();
- assertTrue(response.getClass().getName(), response instanceof MbusResponse);
- Reply reply = ((MbusResponse)response).getReply();
- assertSame(currentCtx, reply.getContext());
- assertEquals(6, reply.getTrace().getLevel());
- assertSame(pushedHandler, reply.popHandler());
- assertSame(pushedCtx, reply.getContext());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatStateIsTransferredToSyncMbusSendFailureResponse() {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- driver.sourceSession().close();
-
- Message msg = new SimpleMessage("foo");
- ReplyHandler pushedHandler = new MyReplyHandler();
- Object pushedCtx = new Object();
- msg.setContext(pushedCtx);
- msg.pushHandler(pushedHandler);
- Object currentCtx = new Object();
- msg.setContext(currentCtx);
- msg.getTrace().setLevel(6);
-
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
- driver.sendMessage(msg, responseHandler);
-
- Response response = responseHandler.awaitResponse();
- assertNotNull(response);
- assertTrue(response.getClass().getName(), response instanceof MbusResponse);
- Reply reply = ((MbusResponse)response).getReply();
- assertSame(currentCtx, reply.getContext());
- assertEquals(6, reply.getTrace().getLevel());
- assertSame(pushedHandler, reply.popHandler());
- assertSame(pushedCtx, reply.getContext());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatStateIsTransferredToTimeoutResponse() throws InterruptedException {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
-
- Message msg = new SimpleMessage("foo");
- ReplyHandler pushedHandler = new MyReplyHandler();
- Object pushedCtx = new Object();
- msg.setContext(pushedCtx);
- msg.pushHandler(pushedHandler);
- Object currentCtx = new Object();
- msg.setContext(currentCtx);
- msg.getTrace().setLevel(6);
-
- Request request = driver.newClientRequest(msg);
- request.setTimeout(1, TimeUnit.MILLISECONDS);
- assertTrue(driver.sendRequest(request, responseHandler));
- request.release();
-
- Response response = responseHandler.awaitResponse();
- assertNotNull(response);
- assertTrue(response.getClass().getName(), response instanceof MbusResponse);
- Reply reply = ((MbusResponse)response).getReply();
- assertSame(currentCtx, reply.getContext());
- assertEquals(6, reply.getTrace().getLevel());
- assertSame(pushedHandler, reply.popHandler());
- assertSame(pushedCtx, reply.getContext());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatSyncMbusSendFailureRespondsWithError() {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- driver.sourceSession().close();
-
- MyResponseHandler responseHandler = MyResponseHandler.newInstance();
- driver.sendMessage(new SimpleMessage("foo"), responseHandler);
- Response response = responseHandler.awaitResponse();
- assertNotNull(response);
- assertTrue(response.getClass().getName(), response instanceof MbusResponse);
- Reply reply = ((MbusResponse)response).getReply();
- assertEquals(1, reply.getNumErrors());
- assertEquals(ErrorCode.SEND_QUEUE_CLOSED, reply.getError(0).getCode());
- assertTrue(driver.close());
- }
-
- private static class MyResponseHandler implements ResponseHandler {
-
- final MyResponseContent content;
- Response response;
-
- MyResponseHandler(MyResponseContent content) {
- this.content = content;
- }
-
- Response awaitResponse() {
- try {
- content.closeLatch.await(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- if (response instanceof MbusResponse) {
- //System.out.println(((MbusResponse)response).getReply().getTrace());
- }
- return response;
- }
-
- @Override
- public ContentChannel handleResponse(Response response) {
- this.response = response;
- return content;
- }
-
- static MyResponseHandler newInstance() {
- return new MyResponseHandler(new MyResponseContent());
- }
- }
-
- private static class MyResponseContent implements ContentChannel {
-
- final CountDownLatch writeLatch = new CountDownLatch(1);
- final CountDownLatch closeLatch = new CountDownLatch(1);
-
- @Override
- public void write(ByteBuffer buf, CompletionHandler handler) {
- if (handler != null) {
- handler.completed();
- }
- writeLatch.countDown();
- }
-
- @Override
- public void close(CompletionHandler handler) {
- if (handler != null) {
- handler.completed();
- }
- closeLatch.countDown();
- }
- }
-
- private static class MySession implements ClientSession {
-
- int refCount = 1;
-
- @Override
- public Result sendMessage(Message msg) {
- return null;
- }
-
- @Override
- public ResourceReference refer() {
- ++refCount;
- return new ResourceReference() {
- @Override
- public void close() {
- --refCount;
- }
- };
- }
-
- @Override
- public void release() {
- --refCount;
- }
- }
-
- private static class MyReplyHandler implements ReplyHandler {
-
- @Override
- public void handleReply(Reply reply) {
-
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java
deleted file mode 100644
index 316ad18bae9..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java
+++ /dev/null
@@ -1,121 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.application.ContainerBuilder;
-import com.yahoo.jdisc.handler.RequestDispatch;
-import com.yahoo.jdisc.test.TestDriver;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.test.SimpleMessage;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MbusRequestHandlerTestCase {
-
- @Test
- public void requireThatNonMbusRequestThrows() throws Exception {
- final TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE);
- try {
- new RequestDispatch() {
-
- @Override
- protected Request newRequest() {
- return new Request(driver, URI.create("mbus://localhost/"));
- }
- }.connect();
- fail();
- } catch (UnsupportedOperationException e) {
- assertEquals("Expected MbusRequest, got com.yahoo.jdisc.Request.", e.getMessage());
- }
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatHandlerCanRespondInSameThread() throws Exception {
- TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE);
-
- Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS);
- assertTrue(response instanceof MbusResponse);
- assertEquals(Response.Status.OK, response.getStatus());
- Reply reply = ((MbusResponse)response).getReply();
- assertTrue(reply instanceof EmptyReply);
- assertFalse(reply.hasErrors());
-
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatHandlerCanRespondInOtherThread() throws Exception {
- TestDriver driver = newTestDriver(ThreadedReplier.INSTANCE);
-
- Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS);
- assertTrue(response instanceof MbusResponse);
- assertEquals(Response.Status.OK, response.getStatus());
- Reply reply = ((MbusResponse)response).getReply();
- assertTrue(reply instanceof EmptyReply);
- assertFalse(reply.hasErrors());
-
- assertTrue(driver.close());
- }
-
- private static TestDriver newTestDriver(MbusRequestHandler handler) {
- TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
- ContainerBuilder builder = driver.newContainerBuilder();
- builder.serverBindings().bind("mbus://*/*", handler);
- driver.activateContainer(builder);
- return driver;
- }
-
- private static ListenableFuture<Response> dispatchMessage(final TestDriver driver, final Message msg) {
- return new RequestDispatch() {
-
- @Override
- protected Request newRequest() {
- return new MbusRequest(driver, URI.create("mbus://localhost/"), msg);
- }
- }.dispatch();
- }
-
- private static class SameThreadReplier extends MbusRequestHandler {
-
- final static SameThreadReplier INSTANCE = new SameThreadReplier();
-
- @Override
- public void handleMessage(Message msg) {
- Reply reply = new EmptyReply();
- reply.swapState(msg);
- reply.popHandler().handleReply(reply);
- }
- }
-
- private static class ThreadedReplier extends MbusRequestHandler {
-
- final static ThreadedReplier INSTANCE = new ThreadedReplier();
-
- @Override
- public void handleMessage(final Message msg) {
- Executors.newSingleThreadExecutor().execute(new Runnable() {
-
- @Override
- public void run() {
- SameThreadReplier.INSTANCE.handleMessage(msg);
- }
- });
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java
deleted file mode 100644
index c68ab4e6742..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.test.TestDriver;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.text.Utf8String;
-import org.junit.Test;
-
-import java.net.URI;
-
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MbusRequestTestCase {
-
- @Test
- public void requireThatAccessorsWork() {
- TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
- driver.activateContainer(driver.newContainerBuilder());
-
- MyMessage msg = new MyMessage();
- MbusRequest request = new MbusRequest(driver, URI.create("mbus://host/path"), msg);
- assertSame(msg, request.getMessage());
- request.release();
- driver.close();
- }
-
- @Test
- public void requireThatMessageCanNotBeNullInRootRequest() {
- TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
- driver.activateContainer(driver.newContainerBuilder());
- try {
- new MbusRequest(driver, URI.create("mbus://host/path"), null);
- fail();
- } catch (NullPointerException e) {
- // expected
- }
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatMessageCanNotBeNullInChildRequest() {
- TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
- driver.activateContainer(driver.newContainerBuilder());
- MbusRequest parent = new MbusRequest(driver, URI.create("mbus://host/path"), new SimpleMessage("foo"));
- try {
- new MbusRequest(parent, URI.create("mbus://host/path"), null);
- fail();
- } catch (NullPointerException e) {
- // expected
- }
- parent.release();
- assertTrue(driver.close());
- }
-
- private class MyMessage extends Message {
-
- @Override
- public Utf8String getProtocol() {
- return null;
- }
-
- @Override
- public int getType() {
- return 0;
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java
deleted file mode 100644
index eb4cb949770..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.Response;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.text.Utf8String;
-import org.junit.Test;
-
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.fail;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MbusResponseTestCase {
-
- @Test
- public void requireThatAccessorsWork() {
- MyReply reply = new MyReply();
- MbusResponse response = new MbusResponse(Response.Status.OK, reply);
- assertSame(reply, response.getReply());
- }
-
- @Test
- public void requireThatReplyCanNotBeNull() {
- try {
- new MbusResponse(Response.Status.OK, null);
- fail();
- } catch (NullPointerException e) {
-
- }
- }
-
- private class MyReply extends Reply {
-
- @Override
- public Utf8String getProtocol() {
- return null;
- }
-
- @Override
- public int getType() {
- return 0;
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
deleted file mode 100644
index bf89f3869ed..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java
+++ /dev/null
@@ -1,694 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-import com.yahoo.jdisc.test.ServerProviderConformanceTest;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.SourceSession;
-import com.yahoo.messagebus.SourceSessionParams;
-import com.yahoo.messagebus.network.local.LocalNetwork;
-import com.yahoo.messagebus.network.local.LocalWire;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.shared.ServerSession;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import org.hamcrest.Matcher;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import static com.yahoo.messagebus.ErrorCode.APP_FATAL_ERROR;
-import static com.yahoo.messagebus.ErrorCode.SESSION_BUSY;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MbusServerConformanceTest extends ServerProviderConformanceTest {
-
- /* Many of the "success" expectations here (may) seem odd. But this is the current behavior of the
- * messagebus server. We should probably look into whether the behavior is correct in all cases.
- */
-
- @Override
- @Test
- public void testContainerNotReadyException() throws Throwable {
- new TestRunner().setRequestTimeout(100, TimeUnit.MILLISECONDS)
- .expectError(is(SESSION_BUSY))
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testBindingSetNotFoundException() throws Throwable {
- new TestRunner().expectError(is(APP_FATAL_ERROR))
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testNoBindingSetSelectedException() throws Throwable {
- new TestRunner().expectError(is(APP_FATAL_ERROR))
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testBindingNotFoundException() throws Throwable {
- new TestRunner().expectError(is(APP_FATAL_ERROR))
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestHandlerWithSyncCloseResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestHandlerWithSyncWriteResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestHandlerWithSyncHandleResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestHandlerWithAsyncHandleResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestException() throws Throwable {
- new TestRunner().expectError(is(APP_FATAL_ERROR))
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestExceptionWithSyncCloseResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestExceptionWithSyncWriteResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestNondeterministicExceptionWithSyncHandleResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestExceptionAfterResponseWriteWithSyncHandleResponse() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestNondeterministicExceptionWithAsyncHandleResponse() throws Throwable {
- new TestRunner().executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse() throws Throwable {
- new TestRunner().expectError(is(APP_FATAL_ERROR))
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithSyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithAsyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithNondeterministicSyncFailure() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithSyncFailureBeforeResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithSyncFailureAfterResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithNondeterministicAsyncFailure() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithAsyncFailureBeforeResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithAsyncFailureAfterResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteNondeterministicException() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionBeforeResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionAfterResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionAfterResponseCloseNoContent() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteNondeterministicExceptionWithSyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionWithNondeterministicSyncFailure() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionWithNondeterministicAsyncFailure() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestContentCloseWithSyncCompletion() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseWithAsyncCompletion() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseWithNondeterministicSyncFailure() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseWithSyncFailureBeforeResponseWrite() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentCloseWithSyncFailureAfterResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseWithNondeterministicAsyncFailure() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseWithAsyncFailureBeforeResponseWrite() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentCloseWithAsyncFailureAfterResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseNondeterministicException() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionBeforeResponseWrite() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentCloseExceptionAfterResponseWrite() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionAfterResponseCloseNoContent() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseNondeterministicExceptionWithSyncCompletion() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseNondeterministicExceptionWithSyncFailure() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseNondeterministicExceptionWithAsyncFailure() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure() throws Throwable {
- }
-
- @Override
- @Test
- public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- @Ignore // N/A: The messagebus protocol does not have content.
- public void testResponseWriteCompletionException() throws Throwable {
- }
-
- @Override
- @Test
- public void testResponseCloseCompletionException() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- @Override
- @Test
- public void testResponseCloseCompletionExceptionNoContent() throws Throwable {
- new TestRunner().expectSuccess()
- .executeAndClose();
- }
-
- private class TestRunner implements Adapter<MbusServer, MyClient, Reply> {
-
- final LocalWire wire = new LocalWire();
- final SharedMessageBus mbus;
- final ServerSession session;
- Matcher<Integer> expectedError = null;
- boolean successExpected = false;
- long timeoutMillis = TimeUnit.SECONDS.toMillis(60);
-
- TestRunner() {
- this(new MessageBusParams().addProtocol(new SimpleProtocol()),
- new DestinationSessionParams());
- }
-
- TestRunner(MessageBusParams mbusParams, DestinationSessionParams sessionParams) {
- this.mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(wire), mbusParams));
- this.session = mbus.newDestinationSession(sessionParams);
- }
-
- TestRunner setRequestTimeout(long timeout, TimeUnit unit) {
- timeoutMillis = unit.toMillis(timeout);
- return this;
- }
-
- TestRunner expectError(Matcher<Integer> matcher) {
- assertThat(successExpected, is(false));
- expectedError = matcher;
- return this;
- }
-
- TestRunner expectSuccess() {
- assertThat(expectedError, is(nullValue()));
- successExpected = true;
- return this;
- }
-
- @Override
- public Module newConfigModule() {
- return new AbstractModule() {
-
- @Override
- protected void configure() {
- bind(ServerSession.class).toInstance(session);
- }
- };
- }
-
- @Override
- public Class<MbusServer> getServerProviderClass() {
- return MbusServer.class;
- }
-
- @Override
- public MyClient newClient(MbusServer server) throws Throwable {
- return new MyClient(wire, server.connectionSpec());
- }
-
- @Override
- public Reply executeRequest(MyClient client, boolean withRequestContent) throws Throwable {
- // This protocol doesn't have the concept of "request content", so if we are asked to send any, it's a bug.
- assertThat(withRequestContent, is(false));
-
- final SimpleMessage msg = new SimpleMessage("foo");
- msg.getTrace().setLevel(9);
- msg.setRoute(client.route);
- msg.setTimeRemaining(timeoutMillis);
- assertThat("client.session.send(msg).isAccepted()",
- client.session.send(msg).isAccepted(), is(true));
-
- final Reply reply = client.replies.poll(60, TimeUnit.SECONDS);
- assertThat("reply != null", reply, notNullValue());
- return reply;
- }
-
- @Override
- public Iterable<ByteBuffer> newResponseContent() {
- return Collections.emptyList();
- }
-
- @Override
- public void validateResponse(Reply reply) throws Throwable {
- final String trace = String.valueOf(reply.getTrace());
- if (expectedError != null) {
- assertThat(reply.hasErrors(), is(true));
- final int error = reply.getError(0).getCode();
- assertThat(trace, error, expectedError);
- }
- if (successExpected) {
- assertThat(trace, reply.hasErrors(), is(false));
- }
- }
-
- void executeAndClose() throws Throwable {
- runTest(this);
- session.release();
- mbus.release();
- }
- }
-
- public static class MyClient implements Closeable, ReplyHandler {
-
- final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>();
- final MessageBus mbus;
- final Route route;
- final SourceSession session;
-
- MyClient(LocalWire wire, String connectionSpec) {
- this.mbus = new MessageBus(new LocalNetwork(wire),
- new MessageBusParams().addProtocol(new SimpleProtocol()));
- this.session = mbus.createSourceSession(new SourceSessionParams().setReplyHandler(this));
- this.route = Route.parse(connectionSpec);
- }
-
- @Override
- public void close() throws IOException {
- session.destroy();
- mbus.destroy();
- }
-
- @Override
- public void handleReply(Reply reply) {
- replies.addLast(reply);
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
deleted file mode 100644
index 9d45d2e7abf..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java
+++ /dev/null
@@ -1,374 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.google.inject.AbstractModule;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.ResourceReference;
-import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.application.BindingSetSelector;
-import com.yahoo.jdisc.handler.*;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.jdisc.test.ServerTestDriver;
-import com.yahoo.messagebus.shared.ServerSession;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleReply;
-import org.junit.Test;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class MbusServerTestCase {
-
- @Test
- public void requireThatServerRetainsSession() {
- MySession session = new MySession();
- assertEquals(1, session.refCount);
- MbusServer server = new MbusServer(null, session);
- assertEquals(2, session.refCount);
- session.release();
- assertEquals(1, session.refCount);
- server.destroy();
- assertEquals(0, session.refCount);
- }
-
- @Test
- public void requireThatNoBindingSetSelectedExceptionIsCaught() {
- ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector(null));
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
- assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatBindingSetNotFoundExceptionIsCaught() {
- ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector("foo"));
- assertTrue(driver.sendMessage(new SimpleMessage("bar")));
- assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatContainerNotReadyExceptionIsCaught() {
- ServerTestDriver driver = ServerTestDriver.newInactiveInstance(true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
- assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatBindingNotFoundExceptionIsCaught() {
- ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
- assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatRequestDeniedExceptionIsCaught() {
- ServerTestDriver driver = ServerTestDriver.newInstance(MyRequestHandler.newRequestDenied(), true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
- assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR));
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatRequestResponseWorks() {
- MyRequestHandler requestHandler = MyRequestHandler.newInstance();
- ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
-
- assertNotNull(requestHandler.awaitRequest());
- assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK)));
-
- assertNotNull(driver.awaitSuccess());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatRequestIsMbus() {
- MyRequestHandler requestHandler = MyRequestHandler.newInstance();
- ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
-
- Request request = requestHandler.awaitRequest();
- assertTrue(request instanceof MbusRequest);
- Message msg = ((MbusRequest)request).getMessage();
- assertTrue(msg instanceof SimpleMessage);
- assertEquals("foo", ((SimpleMessage)msg).getValue());
- assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK)));
-
- assertNotNull(driver.awaitSuccess());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatReplyInsideMbusResponseIsUsed() {
- MyRequestHandler requestHandler = MyRequestHandler.newInstance();
- ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
-
- assertNotNull(requestHandler.awaitRequest());
- Reply reply = new SimpleReply("bar");
- reply.swapState(((MbusRequest)requestHandler.request).getMessage());
- assertTrue(requestHandler.sendResponse(new MbusResponse(Response.Status.OK, reply)));
-
- reply = driver.awaitSuccess();
- assertTrue(reply instanceof SimpleReply);
- assertEquals("bar", ((SimpleReply)reply).getValue());
-
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatNonMbusResponseCausesEmptyReply() {
- MyRequestHandler requestHandler = MyRequestHandler.newInstance();
- ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
-
- assertNotNull(requestHandler.awaitRequest());
- assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK)));
-
- assertNotNull(driver.awaitSuccess());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatMbusRequestContentCallsCompletion() throws InterruptedException {
- MyRequestHandler requestHandler = MyRequestHandler.newInstance();
- ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
-
- assertNotNull(requestHandler.awaitRequest());
- ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK));
- assertNotNull(content);
- MyCompletion completion = new MyCompletion();
- content.close(completion);
- assertTrue(completion.completedLatch.await(60, TimeUnit.SECONDS));
-
- assertNotNull(driver.awaitSuccess());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatResponseContentDoesNotSupportWrite() {
- MyRequestHandler requestHandler = MyRequestHandler.newInstance();
- ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
-
- assertNotNull(requestHandler.awaitRequest());
- ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK));
- assertNotNull(content);
- try {
- content.write(ByteBuffer.allocate(69), null);
- fail();
- } catch (UnsupportedOperationException e) {
-
- }
- content.close(null);
-
- assertNotNull(driver.awaitSuccess());
- assertTrue(driver.close());
- }
-
- @Test
- public void requireThatResponseErrorCodeDoesNotDuplicateReplyError() {
- assertError(Collections.<Integer>emptyList(),
- Response.Status.OK);
- assertError(Arrays.asList(ErrorCode.APP_FATAL_ERROR),
- Response.Status.BAD_REQUEST);
- assertError(Arrays.asList(ErrorCode.FATAL_ERROR),
- Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR);
- assertError(Arrays.asList(ErrorCode.TRANSIENT_ERROR, ErrorCode.APP_FATAL_ERROR),
- Response.Status.BAD_REQUEST, ErrorCode.TRANSIENT_ERROR);
- assertError(Arrays.asList(ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR),
- Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR);
- }
-
- private static void assertError(List<Integer> expectedErrors, int responseStatus, int... responseErrors) {
- MyRequestHandler requestHandler = MyRequestHandler.newInstance();
- ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true);
- assertTrue(driver.sendMessage(new SimpleMessage("foo")));
-
- assertNotNull(requestHandler.awaitRequest());
- Reply reply = new SimpleReply("bar");
- reply.swapState(((MbusRequest)requestHandler.request).getMessage());
- for (int err : responseErrors) {
- reply.addError(new Error(err, "err"));
- }
- assertTrue(requestHandler.sendResponse(new MbusResponse(responseStatus, reply)));
-
- assertNotNull(reply = driver.awaitReply());
- List<Integer> actual = new LinkedList<>();
- for (int i = 0; i < reply.getNumErrors(); ++i) {
- actual.add(reply.getError(i).getCode());
- }
- assertEquals(expectedErrors, actual);
- assertTrue(driver.close());
- }
-
- private static class MySelector extends AbstractModule implements BindingSetSelector {
-
- final String bindingSet;
-
- MySelector(String bindingSet) {
- this.bindingSet = bindingSet;
- }
-
- @Override
- protected void configure() {
- bind(BindingSetSelector.class).toInstance(this);
- }
-
- @Override
- public String select(URI uri) {
- return bindingSet;
- }
- }
-
- private static class MyRequestHandler extends AbstractRequestHandler {
-
- final MyRequestContent content;
- Request request;
- ResponseHandler responseHandler;
-
- MyRequestHandler(MyRequestContent content) {
- this.content = content;
- }
-
- @Override
- public ContentChannel handleRequest(Request request, ResponseHandler responseHandler) {
- this.request = request;
- this.responseHandler = responseHandler;
- if (content == null) {
- throw new RequestDeniedException(request);
- }
- return content;
- }
-
- Request awaitRequest() {
- try {
- if (!content.closeLatch.await(60, TimeUnit.SECONDS)) {
- return null;
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- if (request instanceof MbusRequest) {
- ((MbusRequest)request).getMessage().getTrace().trace(0, "Request received by DISC.");
- }
- return request;
- }
-
- boolean sendResponse(Response response) {
- ContentChannel content = responseHandler.handleResponse(response);
- if (content == null) {
- return false;
- }
- content.close(null);
- return true;
- }
-
- static MyRequestHandler newInstance() {
- return new MyRequestHandler(new MyRequestContent());
- }
-
- static MyRequestHandler newRequestDenied() {
- return new MyRequestHandler(null);
- }
- }
-
- private static class MyRequestContent implements ContentChannel {
-
- final CountDownLatch writeLatch = new CountDownLatch(1);
- final CountDownLatch closeLatch = new CountDownLatch(1);
-
- @Override
- public void write(ByteBuffer buf, CompletionHandler handler) {
- if (handler != null) {
- handler.completed();
- }
- writeLatch.countDown();
- }
-
- @Override
- public void close(CompletionHandler handler) {
- if (handler != null) {
- handler.completed();
- }
- closeLatch.countDown();
- }
- }
-
- private static class MyCompletion implements CompletionHandler {
-
- final CountDownLatch completedLatch = new CountDownLatch(1);
-
- @Override
- public void completed() {
- completedLatch.countDown();
- }
-
- @Override
- public void failed(Throwable t) {
-
- }
- }
-
- private static class MySession implements ServerSession {
-
- int refCount = 1;
-
- @Override
- public void sendReply(Reply reply) {
-
- }
-
- @Override
- public MessageHandler getMessageHandler() {
- return null;
- }
-
- @Override
- public void setMessageHandler(MessageHandler msgHandler) {
-
- }
-
- @Override
- public String connectionSpec() {
- return null;
- }
-
- @Override
- public String name() {
- return null;
- }
-
- @Override
- public ResourceReference refer() {
- ++refCount;
- return new ResourceReference() {
- @Override
- public void close() {
- --refCount;
- }
- };
- }
-
- @Override
- public void release() {
- --refCount;
- }
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java
deleted file mode 100644
index a7ee355094f..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc;
-
-import com.yahoo.jdisc.application.ContainerBuilder;
-import com.yahoo.jdisc.service.CurrentContainer;
-import com.yahoo.jdisc.test.TestDriver;
-import com.yahoo.messagebus.DestinationSessionParams;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.SourceSession;
-import com.yahoo.messagebus.SourceSessionParams;
-import com.yahoo.messagebus.network.local.LocalNetwork;
-import com.yahoo.messagebus.network.local.LocalWire;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.shared.SharedDestinationSession;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import org.junit.Test;
-
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class ServerThreadingTestCase {
-
- private static final int NUM_THREADS = 32;
- private static final int NUM_REQUESTS = 1000;
-
- @Test
- public void requireThatServerIsThreadSafe() throws Exception {
- final LocalWire wire = new LocalWire();
- final Client client = new Client(wire);
- final Server server = new Server(wire);
-
- for (int i = 0; i < NUM_REQUESTS; ++i) {
- final Message msg = new SimpleMessage("foo");
- msg.setRoute(Route.parse(server.delegate.connectionSpec()));
- msg.pushHandler(client);
- assertThat(client.session.send(msg).isAccepted(), is(true));
- }
- for (int i = 0; i < NUM_REQUESTS; ++i) {
- final Reply reply = client.replies.poll(600, TimeUnit.SECONDS);
- assertThat(reply, instanceOf(EmptyReply.class));
- assertThat(reply.hasErrors(), is(false));
- }
-
- assertThat(client.close(), is(true));
- assertThat(server.close(), is(true));
- }
-
- private static class Client implements ReplyHandler {
-
- final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>();
- final MessageBus mbus;
- final SourceSession session;
-
- Client(final LocalWire wire) {
- mbus = new MessageBus(
- new LocalNetwork(wire),
- new MessageBusParams().addProtocol(new SimpleProtocol()));
- session = mbus.createSourceSession(
- new SourceSessionParams()
- .setReplyHandler(this)
- .setThrottlePolicy(null));
- }
-
- @Override
- public void handleReply(final Reply reply) {
- replies.addLast(reply);
- }
-
- boolean close() {
- return session.destroy() && mbus.destroy();
- }
- }
-
- private static class Server extends MbusRequestHandler {
-
- final Executor executor = Executors.newFixedThreadPool(NUM_THREADS);
- final MbusServer delegate;
- final TestDriver driver;
-
- Server(final LocalWire wire) {
- driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
- delegate = newMbusServer(driver, wire);
-
- final ContainerBuilder builder = driver.newContainerBuilder();
- builder.serverBindings().bind("mbus://*/*", this);
- driver.activateContainer(builder);
- delegate.start();
- }
-
- @Override
- public void handleMessage(final Message msg) {
- executor.execute(new Runnable() {
-
- @Override
- public void run() {
- final Reply reply = new EmptyReply();
- reply.swapState(msg);
- reply.popHandler().handleReply(reply);
- }
- });
- }
-
- boolean close() {
- delegate.release();
- return driver.close();
- }
- }
-
- private static MbusServer newMbusServer(final CurrentContainer container, final LocalWire wire) {
- final SharedMessageBus mbus = new SharedMessageBus(new MessageBus(
- new LocalNetwork(wire),
- new MessageBusParams().addProtocol(new SimpleProtocol())));
- final SharedDestinationSession session = mbus.newDestinationSession(
- new DestinationSessionParams());
- final MbusServer server = new MbusServer(container, session);
- session.release();
- mbus.release();
- return server;
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
deleted file mode 100644
index ef290a070cb..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class ClientTestDriverTestCase {
-
- @Test
- public void requireThatFactoryMethodsWork() throws ListenFailedException {
- ClientTestDriver driver = ClientTestDriver.newInstance();
- assertNotNull(driver);
- assertTrue(driver.close());
-
- driver = ClientTestDriver.newInstanceWithProtocol(new SimpleProtocol());
- assertNotNull(driver);
- assertTrue(driver.close());
-
- Slobrok slobrok = new Slobrok();
- driver = ClientTestDriver.newInstanceWithExternSlobrok(slobrok.configId());
- assertNotNull(driver);
- assertTrue(driver.close());
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
deleted file mode 100644
index f6ae2335d12..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.jdisc.test;
-
-import com.yahoo.jdisc.test.NonWorkingRequestHandler;
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class ServerTestDriverTestCase {
-
- @Test
- public void requireThatFactoryMethodsWork() throws ListenFailedException {
- ServerTestDriver driver = ServerTestDriver.newInstance(new NonWorkingRequestHandler(), false);
- assertNotNull(driver);
- assertTrue(driver.close());
-
- driver = ServerTestDriver.newInstanceWithProtocol(new SimpleProtocol(), new NonWorkingRequestHandler(), false);
- assertNotNull(driver);
- assertTrue(driver.close());
-
- Slobrok slobrok = new Slobrok();
- driver = ServerTestDriver.newInstanceWithExternSlobrok(slobrok.configId(), new NonWorkingRequestHandler(), false);
- assertNotNull(driver);
- assertTrue(driver.close());
- }
-
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
deleted file mode 100644
index 78e79da4b9f..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedDestinationSessionTestCase.java
+++ /dev/null
@@ -1,134 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.jdisc.test.MessageQueue;
-import com.yahoo.messagebus.jdisc.test.RemoteClient;
-import com.yahoo.messagebus.jdisc.test.ReplyQueue;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import com.yahoo.messagebus.test.SimpleReply;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class SharedDestinationSessionTestCase {
-
- @Test
- public void requireThatMessageHandlerCanBeAccessed() {
- SharedDestinationSession session = newDestinationSession();
- assertNull(session.getMessageHandler());
-
- MessageQueue handler = new MessageQueue();
- session.setMessageHandler(handler);
- assertSame(handler, session.getMessageHandler());
- }
-
- @Test
- public void requireThatMessageHandlerCanOnlyBeSetOnce() {
- SharedDestinationSession session = newDestinationSession();
- session.setMessageHandler(new MessageQueue());
- try {
- session.setMessageHandler(new MessageQueue());
- fail();
- } catch (IllegalStateException e) {
- assertEquals("Message handler already registered.", e.getMessage());
- }
- session.release();
- }
-
- @Test
- public void requireThatMessageHandlerIsCalled() throws InterruptedException {
- SharedDestinationSession session = newDestinationSession();
- MessageQueue queue = new MessageQueue();
- session.setMessageHandler(queue);
- session.handleMessage(new SimpleMessage("foo"));
- assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS));
- session.release();
- }
-
- @Test
- public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException {
- SharedDestinationSession session = newDestinationSession();
- Message msg = new SimpleMessage("foo");
- ReplyQueue queue = new ReplyQueue();
- msg.pushHandler(queue);
- session.handleMessage(msg);
- Reply reply = queue.awaitReply(60, TimeUnit.SECONDS);
- assertNotNull(reply);
- assertEquals(1, reply.getNumErrors());
- assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode());
- session.release();
- }
-
- @Test
- public void requireThatSessionIsClosedOnDestroy() {
- SharedDestinationSession session = newDestinationSession();
- session.release();
- assertFalse("DestinationSession not destroyed by release().", session.session().destroy());
- }
-
- @Test
- public void requireThatMbusIsReleasedOnDestroy() {
- Slobrok slobrok = null;
- try {
- slobrok = new Slobrok();
- } catch (ListenFailedException e) {
- fail();
- }
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId());
- SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams);
- SharedDestinationSession session = mbus.newDestinationSession(new DestinationSessionParams());
- mbus.release();
- session.release();
- assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy());
- }
-
- @Test
- public void requireThatSessionCanSendReply() throws InterruptedException {
- RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
- MessageQueue queue = new MessageQueue();
- DestinationSessionParams params = new DestinationSessionParams().setMessageHandler(queue);
- SharedDestinationSession session = newDestinationSession(client.slobrokId(), params);
- Route route = Route.parse(session.connectionSpec());
-
- assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
- Message msg = queue.awaitMessage(60, TimeUnit.SECONDS);
- assertNotNull(msg);
- Reply reply = new SimpleReply("bar");
- reply.swapState(msg);
- session.sendReply(reply);
- assertNotNull(client.awaitReply(60, TimeUnit.SECONDS));
-
- session.release();
- client.close();
- }
-
- private static SharedDestinationSession newDestinationSession() {
- Slobrok slobrok = null;
- try {
- slobrok = new Slobrok();
- } catch (ListenFailedException e) {
- fail();
- }
- return newDestinationSession(slobrok.configId(), new DestinationSessionParams());
- }
-
- private static SharedDestinationSession newDestinationSession(String slobrokId, DestinationSessionParams params) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
- SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
- SharedDestinationSession session = mbus.newDestinationSession(params);
- mbus.release();
- return session;
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
deleted file mode 100644
index 87958415149..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedIntermediateSessionTestCase.java
+++ /dev/null
@@ -1,174 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.jdisc.test.MessageQueue;
-import com.yahoo.messagebus.jdisc.test.RemoteClient;
-import com.yahoo.messagebus.jdisc.test.RemoteServer;
-import com.yahoo.messagebus.jdisc.test.ReplyQueue;
-import com.yahoo.messagebus.network.local.LocalNetwork;
-import com.yahoo.messagebus.network.local.LocalWire;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import com.yahoo.messagebus.test.SimpleReply;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class SharedIntermediateSessionTestCase {
-
- @Test
- public void requireThatMessageHandlerCanBeAccessed() {
- SharedIntermediateSession session = newIntermediateSession(false);
- assertNull(session.getMessageHandler());
-
- MessageQueue handler = new MessageQueue();
- session.setMessageHandler(handler);
- assertSame(handler, session.getMessageHandler());
- }
-
- @Test
- public void requireThatMessageHandlerCanOnlyBeSetOnce() {
- SharedIntermediateSession session = newIntermediateSession(false);
- session.setMessageHandler(new MessageQueue());
- try {
- session.setMessageHandler(new MessageQueue());
- fail();
- } catch (IllegalStateException e) {
- assertEquals("Message handler already registered.", e.getMessage());
- }
- session.release();
- }
-
- @Test
- public void requireThatMessageHandlerIsCalled() throws InterruptedException {
- SharedIntermediateSession session = newIntermediateSession(false);
- MessageQueue queue = new MessageQueue();
- session.setMessageHandler(queue);
- session.handleMessage(new SimpleMessage("foo"));
- assertNotNull(queue.awaitMessage(60, TimeUnit.SECONDS));
- session.release();
- }
-
- @Test
- public void requireThatSessionRepliesIfMessageHandlerIsNull() throws InterruptedException {
- SharedIntermediateSession session = newIntermediateSession(false);
- Message msg = new SimpleMessage("foo");
- ReplyQueue queue = new ReplyQueue();
- msg.pushHandler(queue);
- session.handleMessage(msg);
- Reply reply = queue.awaitReply(60, TimeUnit.SECONDS);
- assertNotNull(reply);
- assertEquals(1, reply.getNumErrors());
- assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode());
- session.release();
- }
-
- @Test
- public void requireThatReplyHandlerCanNotBeSet() throws ListenFailedException {
- Slobrok slobrok = new Slobrok();
- try {
- newIntermediateSession(slobrok.configId(),
- new IntermediateSessionParams().setReplyHandler(new ReplyQueue()),
- false);
- fail();
- } catch (IllegalArgumentException e) {
- assertEquals("Reply handler must be null.", e.getMessage());
- }
- }
-
- @Test
- public void requireThatSessionIsClosedOnDestroy() {
- SharedIntermediateSession session = newIntermediateSession(false);
- session.release();
- assertFalse("IntermediateSession not destroyed by release().", session.session().destroy());
- }
-
- @Test
- public void requireThatMbusIsReleasedOnDestroy() {
- try {
- new Slobrok();
- } catch (ListenFailedException e) {
- fail();
- }
- SharedMessageBus mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), new MessageBusParams()));
-
- SharedIntermediateSession session = mbus.newIntermediateSession(new IntermediateSessionParams());
- mbus.release();
- session.release();
- assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy());
- }
-
- @Test
- public void requireThatSessionCanSendMessage() throws InterruptedException {
- RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
- SharedIntermediateSession session = newIntermediateSession(server.slobrokId(),
- new IntermediateSessionParams(),
- true);
- ReplyQueue queue = new ReplyQueue();
- Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
- msg.setTimeReceivedNow();
- msg.setTimeRemaining(60000);
- msg.getTrace().setLevel(9);
- msg.pushHandler(queue);
- assertTrue(session.sendMessage(msg).isAccepted());
- assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS));
- server.ackMessage(msg);
- assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS));
-
- session.release();
- server.close();
- }
-
- @Test
- public void requireThatSessionCanSendReply() throws InterruptedException {
- RemoteClient client = RemoteClient.newInstanceWithInternSlobrok(true);
- MessageQueue queue = new MessageQueue();
- IntermediateSessionParams params = new IntermediateSessionParams().setMessageHandler(queue);
- SharedIntermediateSession session = newIntermediateSession(client.slobrokId(), params, true);
- Route route = Route.parse(session.connectionSpec());
-
- assertTrue(client.sendMessage(new SimpleMessage("foo").setRoute(route)).isAccepted());
- Message msg = queue.awaitMessage(60, TimeUnit.SECONDS);
- assertNotNull(msg);
- Reply reply = new SimpleReply("bar");
- reply.swapState(msg);
- session.sendReply(reply);
- assertNotNull(client.awaitReply(60, TimeUnit.SECONDS));
-
- session.release();
- client.close();
- }
-
- private static SharedIntermediateSession newIntermediateSession(boolean network) {
- Slobrok slobrok = null;
- try {
- slobrok = new Slobrok();
- } catch (ListenFailedException e) {
- fail();
- }
- return newIntermediateSession(slobrok.configId(), new IntermediateSessionParams(), network);
- }
-
- private static SharedIntermediateSession newIntermediateSession(String slobrokId,
- IntermediateSessionParams params,
- boolean network) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
- SharedMessageBus mbus = network
- ? SharedMessageBus.newInstance(mbusParams, netParams)
- : new SharedMessageBus(new MessageBus(new LocalNetwork(new LocalWire()), mbusParams));
- SharedIntermediateSession session = mbus.newIntermediateSession(params);
- mbus.release();
- return session;
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java
deleted file mode 100644
index a54489a89e6..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedMessageBusTestCase.java
+++ /dev/null
@@ -1,37 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class SharedMessageBusTestCase {
-
- @Test
- public void requireThatMbusCanNotBeNull() {
- try {
- new SharedMessageBus(null);
- fail();
- } catch (NullPointerException e) {
- // expected
- }
- }
-
- @Test
- public void requireThatMbusIsClosedOnDestroy() throws ListenFailedException {
- Slobrok slobrok = new Slobrok();
- SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(),
- new RPCNetworkParams()
- .setSlobrokConfigId(slobrok.configId()));
- mbus.release();
- assertFalse(mbus.messageBus().destroy());
- }
-}
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
deleted file mode 100644
index 1f0966fc961..00000000000
--- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/shared/SharedSourceSessionTestCase.java
+++ /dev/null
@@ -1,94 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.shared;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.SourceSessionParams;
-import com.yahoo.messagebus.jdisc.test.RemoteServer;
-import com.yahoo.messagebus.jdisc.test.ReplyQueue;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-/**
- * @author Simon Thoresen Hult
- */
-public class SharedSourceSessionTestCase {
-
- @Test
- public void requireThatReplyHandlerCanNotBeSet() {
- try {
- newSourceSession(new SourceSessionParams().setReplyHandler(new ReplyQueue()));
- fail();
- } catch (IllegalArgumentException e) {
- assertEquals("Reply handler must be null.", e.getMessage());
- }
- }
-
- @Test
- public void requireThatSessionIsClosedOnDestroy() {
- SharedSourceSession session = newSourceSession(new SourceSessionParams());
- session.release();
- assertFalse("SourceSession not destroyed by release().", session.session().destroy());
- }
-
- @Test
- public void requireThatMbusIsReleasedOnDestroy() {
- Slobrok slobrok = null;
- try {
- slobrok = new Slobrok();
- } catch (ListenFailedException e) {
- fail();
- }
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrok.configId());
- SharedMessageBus mbus = SharedMessageBus.newInstance(new MessageBusParams(), netParams);
- SharedSourceSession session = mbus.newSourceSession(new SourceSessionParams());
- mbus.release();
- session.release();
- assertFalse("MessageBus not destroyed by release().", mbus.messageBus().destroy());
- }
-
- @Test
- public void requireThatSessionCanSendMessage() throws InterruptedException {
- RemoteServer server = RemoteServer.newInstanceWithInternSlobrok();
- SharedSourceSession session = newSourceSession(server.slobrokId(),
- new SourceSessionParams());
- ReplyQueue queue = new ReplyQueue();
- Message msg = new SimpleMessage("foo").setRoute(Route.parse(server.connectionSpec()));
- msg.pushHandler(queue);
- assertTrue(session.sendMessage(msg).isAccepted());
- assertNotNull(msg = server.awaitMessage(60, TimeUnit.SECONDS));
- server.ackMessage(msg);
- assertNotNull(queue.awaitReply(60, TimeUnit.SECONDS));
-
- session.release();
- server.close();
- }
-
- private static SharedSourceSession newSourceSession(SourceSessionParams params) {
- Slobrok slobrok = null;
- try {
- slobrok = new Slobrok();
- } catch (ListenFailedException e) {
- fail();
- }
- return newSourceSession(slobrok.configId(), params);
- }
-
- private static SharedSourceSession newSourceSession(String slobrokId, SourceSessionParams params) {
- RPCNetworkParams netParams = new RPCNetworkParams().setSlobrokConfigId(slobrokId);
- MessageBusParams mbusParams = new MessageBusParams().addProtocol(new SimpleProtocol());
- SharedMessageBus mbus = SharedMessageBus.newInstance(mbusParams, netParams);
- SharedSourceSession session = mbus.newSourceSession(params);
- mbus.release();
- return session;
- }
-}