diff options
Diffstat (limited to 'container-messagebus/src/test/java/com/yahoo/messagebus/jdisc')
10 files changed, 2005 insertions, 0 deletions
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java new file mode 100644 index 00000000000..62a9a864781 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ClientThreadingTestCase.java @@ -0,0 +1,149 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.FutureResponse; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.DestinationSession; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.shared.SharedSourceSession; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.URI; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author Simon Thoresen Hult + */ +public class ClientThreadingTestCase { + + private static final int NUM_THREADS = 32; + private static final int NUM_REQUESTS = 1000; + + @Test + @Ignore + public void requireThatClientIsThreadSafe() throws Exception { + final LocalWire wire = new LocalWire(); + final Client client = new Client(wire); + final Server server = new Server(wire); + + final List<Callable<Boolean>> lst = new LinkedList<>(); + final Route route = Route.parse(server.session.getConnectionSpec()); + for (int i = 0; i < NUM_THREADS; ++i) { + lst.add(new RequestTask(client, route)); + } + final ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); + for (final Future<Boolean> res : executor.invokeAll(lst, 60, TimeUnit.SECONDS)) { + assertThat(res.get(), is(true)); + } + + assertThat(client.close(), is(true)); + assertThat(server.close(), is(true)); + } + + private static final class RequestTask implements Callable<Boolean> { + + final Client client; + final Route route; + + RequestTask(final Client client, final Route route) { + this.client = client; + this.route = route; + } + + @Override + public Boolean call() throws Exception { + for (int i = 0; i < NUM_REQUESTS; ++i) { + final FutureResponse responseHandler = new FutureResponse(); + client.send(new SimpleMessage("foo").setRoute(route), responseHandler); + responseHandler.get(60, TimeUnit.SECONDS); + } + return true; + } + } + + private static class Client { + + final MbusClient delegate; + final TestDriver driver; + + Client(final LocalWire wire) { + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + delegate = newMbusClient(wire); + + final ContainerBuilder builder = driver.newContainerBuilder(); + builder.clientBindings().bind("mbus://*/*", delegate); + driver.activateContainer(builder); + delegate.start(); + } + + void send(final Message msg, final ResponseHandler handler) { + final MbusRequest request = new MbusRequest(driver, URI.create("mbus://remote/"), msg); + request.setServerRequest(false); + request.connect(handler).close(null); + request.release(); + } + + boolean close() { + delegate.release(); + return driver.close(); + } + } + + private static class Server implements MessageHandler { + + final MessageBus mbus; + final DestinationSession session; + + Server(final LocalWire wire) { + mbus = new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol())); + session = mbus.createDestinationSession( + new DestinationSessionParams().setMessageHandler(this)); + } + + @Override + public void handleMessage(final Message msg) { + session.acknowledge(msg); + } + + boolean close() { + return session.destroy() && mbus.destroy(); + } + } + + private static MbusClient newMbusClient(final LocalWire wire) { + final SharedMessageBus mbus = new SharedMessageBus(new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol()))); + final SharedSourceSession session = mbus.newSourceSession( + new SourceSessionParams()); + final MbusClient client = new MbusClient(session); + session.release(); + mbus.release(); + return client; + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java new file mode 100644 index 00000000000..9cfd1fd02b9 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java @@ -0,0 +1,345 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDeniedException; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.test.ClientTestDriver; +import com.yahoo.messagebus.shared.ClientSession; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author Simon Thoresen Hult + */ +public class MbusClientTestCase { + + @Test + public void requireThatClientRetainsSession() { + MySession session = new MySession(); + assertEquals(1, session.refCount); + MbusClient client = new MbusClient(session); + assertEquals(2, session.refCount); + session.release(); + assertEquals(1, session.refCount); + client.destroy(); + assertEquals(0, session.refCount); + } + + @Test + public void requireThatRequestResponseWorks() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertEquals(Response.Status.OK, response.getStatus()); + assertTrue(driver.close()); + } + + @Test + public void requireThatNonMbusRequestIsDenied() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + Request serverReq = null; + Request clientReq = null; + try { + serverReq = driver.newServerRequest(); + clientReq = new Request(serverReq, URI.create("mbus://host/path")); + clientReq.connect(MyResponseHandler.newInstance()); + fail(); + } catch (RequestDeniedException e) { + System.out.println(e.getMessage()); + } finally { + if (serverReq != null) { + serverReq.release(); + } + if (clientReq != null) { + clientReq.release(); + } + } + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestContentDoesNotSupportWrite() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Request request = null; + ContentChannel content; + try { + request = driver.newClientRequest(new SimpleMessage("foo")); + content = request.connect(responseHandler); + } finally { + if (request != null) { + request.release(); + } + } + try { + content.write(ByteBuffer.allocate(69), null); + fail(); + } catch (UnsupportedOperationException e) { + + } + content.close(null); + + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + assertNotNull(responseHandler.awaitResponse()); + assertTrue(driver.close()); + } + + @Test + public void requireThatResponseIsMbus() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertTrue(response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof EmptyReply); + assertTrue(driver.close()); + } + + @Test + public void requireThatServerReceivesGivenMessage() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + + Message msg = driver.awaitMessage(); + assertTrue(msg instanceof SimpleMessage); + assertEquals("foo", ((SimpleMessage)msg).getValue()); + + Reply reply = new EmptyReply(); + reply.swapState(msg); + driver.sendReply(reply); + + assertNotNull(responseHandler.awaitResponse()); + assertTrue(driver.close()); + } + + @Test + public void requireThatClientReceivesGivenReply() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + + Message msg = driver.awaitMessage(); // TODO: Timing sensitive + assertNotNull(msg); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + driver.sendReply(reply); + + Response response = responseHandler.awaitResponse(); + assertTrue(response instanceof MbusResponse); + reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof SimpleReply); + assertEquals("bar", ((SimpleReply)reply).getValue()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToResponse() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Message msg = new SimpleMessage("foo"); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + ReplyHandler pushedHandler = new MyReplyHandler(); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + assertTrue(driver.sendMessage(msg, responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToSyncMbusSendFailureResponse() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + driver.sourceSession().close(); + + Message msg = new SimpleMessage("foo"); + ReplyHandler pushedHandler = new MyReplyHandler(); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + driver.sendMessage(msg, responseHandler); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToTimeoutResponse() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Message msg = new SimpleMessage("foo"); + ReplyHandler pushedHandler = new MyReplyHandler(); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + + Request request = driver.newClientRequest(msg); + request.setTimeout(1, TimeUnit.MILLISECONDS); + assertTrue(driver.sendRequest(request, responseHandler)); + request.release(); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatSyncMbusSendFailureRespondsWithError() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + driver.sourceSession().close(); + + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + driver.sendMessage(new SimpleMessage("foo"), responseHandler); + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.SEND_QUEUE_CLOSED, reply.getError(0).getCode()); + assertTrue(driver.close()); + } + + private static class MyResponseHandler implements ResponseHandler { + + final MyResponseContent content; + Response response; + + MyResponseHandler(MyResponseContent content) { + this.content = content; + } + + Response awaitResponse() { + try { + content.closeLatch.await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (response instanceof MbusResponse) { + //System.out.println(((MbusResponse)response).getReply().getTrace()); + } + return response; + } + + @Override + public ContentChannel handleResponse(Response response) { + this.response = response; + return content; + } + + static MyResponseHandler newInstance() { + return new MyResponseHandler(new MyResponseContent()); + } + } + + private static class MyResponseContent implements ContentChannel { + + final CountDownLatch writeLatch = new CountDownLatch(1); + final CountDownLatch closeLatch = new CountDownLatch(1); + + @Override + public void write(ByteBuffer buf, CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + writeLatch.countDown(); + } + + @Override + public void close(CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + closeLatch.countDown(); + } + } + + private static class MySession implements ClientSession { + + int refCount = 1; + + @Override + public Result sendMessage(Message msg) { + return null; + } + + @Override + public ResourceReference refer() { + ++refCount; + return new ResourceReference() { + @Override + public void close() { + --refCount; + } + }; + } + + @Override + public void release() { + --refCount; + } + } + + private static class MyReplyHandler implements ReplyHandler { + + @Override + public void handleReply(Reply reply) { + + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java new file mode 100644 index 00000000000..316ad18bae9 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestHandlerTestCase.java @@ -0,0 +1,121 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.common.util.concurrent.ListenableFuture; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.handler.RequestDispatch; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.test.SimpleMessage; +import org.junit.Test; + +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author Simon Thoresen Hult + */ +public class MbusRequestHandlerTestCase { + + @Test + public void requireThatNonMbusRequestThrows() throws Exception { + final TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE); + try { + new RequestDispatch() { + + @Override + protected Request newRequest() { + return new Request(driver, URI.create("mbus://localhost/")); + } + }.connect(); + fail(); + } catch (UnsupportedOperationException e) { + assertEquals("Expected MbusRequest, got com.yahoo.jdisc.Request.", e.getMessage()); + } + assertTrue(driver.close()); + } + + @Test + public void requireThatHandlerCanRespondInSameThread() throws Exception { + TestDriver driver = newTestDriver(SameThreadReplier.INSTANCE); + + Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS); + assertTrue(response instanceof MbusResponse); + assertEquals(Response.Status.OK, response.getStatus()); + Reply reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof EmptyReply); + assertFalse(reply.hasErrors()); + + assertTrue(driver.close()); + } + + @Test + public void requireThatHandlerCanRespondInOtherThread() throws Exception { + TestDriver driver = newTestDriver(ThreadedReplier.INSTANCE); + + Response response = dispatchMessage(driver, new SimpleMessage("msg")).get(60, TimeUnit.SECONDS); + assertTrue(response instanceof MbusResponse); + assertEquals(Response.Status.OK, response.getStatus()); + Reply reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof EmptyReply); + assertFalse(reply.hasErrors()); + + assertTrue(driver.close()); + } + + private static TestDriver newTestDriver(MbusRequestHandler handler) { + TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + ContainerBuilder builder = driver.newContainerBuilder(); + builder.serverBindings().bind("mbus://*/*", handler); + driver.activateContainer(builder); + return driver; + } + + private static ListenableFuture<Response> dispatchMessage(final TestDriver driver, final Message msg) { + return new RequestDispatch() { + + @Override + protected Request newRequest() { + return new MbusRequest(driver, URI.create("mbus://localhost/"), msg); + } + }.dispatch(); + } + + private static class SameThreadReplier extends MbusRequestHandler { + + final static SameThreadReplier INSTANCE = new SameThreadReplier(); + + @Override + public void handleMessage(Message msg) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + } + + private static class ThreadedReplier extends MbusRequestHandler { + + final static ThreadedReplier INSTANCE = new ThreadedReplier(); + + @Override + public void handleMessage(final Message msg) { + Executors.newSingleThreadExecutor().execute(new Runnable() { + + @Override + public void run() { + SameThreadReplier.INSTANCE.handleMessage(msg); + } + }); + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java new file mode 100644 index 00000000000..c68ab4e6742 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java @@ -0,0 +1,73 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.text.Utf8String; +import org.junit.Test; + +import java.net.URI; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author Simon Thoresen Hult + */ +public class MbusRequestTestCase { + + @Test + public void requireThatAccessorsWork() { + TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + driver.activateContainer(driver.newContainerBuilder()); + + MyMessage msg = new MyMessage(); + MbusRequest request = new MbusRequest(driver, URI.create("mbus://host/path"), msg); + assertSame(msg, request.getMessage()); + request.release(); + driver.close(); + } + + @Test + public void requireThatMessageCanNotBeNullInRootRequest() { + TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + driver.activateContainer(driver.newContainerBuilder()); + try { + new MbusRequest(driver, URI.create("mbus://host/path"), null); + fail(); + } catch (NullPointerException e) { + // expected + } + assertTrue(driver.close()); + } + + @Test + public void requireThatMessageCanNotBeNullInChildRequest() { + TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + driver.activateContainer(driver.newContainerBuilder()); + MbusRequest parent = new MbusRequest(driver, URI.create("mbus://host/path"), new SimpleMessage("foo")); + try { + new MbusRequest(parent, URI.create("mbus://host/path"), null); + fail(); + } catch (NullPointerException e) { + // expected + } + parent.release(); + assertTrue(driver.close()); + } + + private class MyMessage extends Message { + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java new file mode 100644 index 00000000000..eb4cb949770 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusResponseTestCase.java @@ -0,0 +1,46 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Response; +import com.yahoo.messagebus.Reply; +import com.yahoo.text.Utf8String; +import org.junit.Test; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; + +/** + * @author Simon Thoresen Hult + */ +public class MbusResponseTestCase { + + @Test + public void requireThatAccessorsWork() { + MyReply reply = new MyReply(); + MbusResponse response = new MbusResponse(Response.Status.OK, reply); + assertSame(reply, response.getReply()); + } + + @Test + public void requireThatReplyCanNotBeNull() { + try { + new MbusResponse(Response.Status.OK, null); + fail(); + } catch (NullPointerException e) { + + } + } + + private class MyReply extends Reply { + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java new file mode 100644 index 00000000000..bf89f3869ed --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerConformanceTest.java @@ -0,0 +1,694 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import com.yahoo.jdisc.test.ServerProviderConformanceTest; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.ServerSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.hamcrest.Matcher; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static com.yahoo.messagebus.ErrorCode.APP_FATAL_ERROR; +import static com.yahoo.messagebus.ErrorCode.SESSION_BUSY; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * @author Simon Thoresen Hult + */ +public class MbusServerConformanceTest extends ServerProviderConformanceTest { + + /* Many of the "success" expectations here (may) seem odd. But this is the current behavior of the + * messagebus server. We should probably look into whether the behavior is correct in all cases. + */ + + @Override + @Test + public void testContainerNotReadyException() throws Throwable { + new TestRunner().setRequestTimeout(100, TimeUnit.MILLISECONDS) + .expectError(is(SESSION_BUSY)) + .executeAndClose(); + } + + @Override + @Test + public void testBindingSetNotFoundException() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testNoBindingSetSelectedException() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testBindingNotFoundException() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testRequestHandlerWithSyncCloseResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestHandlerWithSyncWriteResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestHandlerWithSyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestHandlerWithAsyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestException() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionWithSyncCloseResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionWithSyncWriteResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestNondeterministicExceptionWithSyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionBeforeResponseWriteWithSyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestExceptionAfterResponseWriteWithSyncHandleResponse() throws Throwable { + } + + @Override + @Test + public void testRequestNondeterministicExceptionWithAsyncHandleResponse() throws Throwable { + new TestRunner().executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionBeforeResponseWriteWithAsyncHandleResponse() throws Throwable { + new TestRunner().expectError(is(APP_FATAL_ERROR)) + .executeAndClose(); + } + + @Override + @Test + public void testRequestExceptionAfterResponseCloseNoContentWithAsyncHandleResponse() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestExceptionAfterResponseWriteWithAsyncHandleResponse() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithNondeterministicSyncFailure() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithSyncFailureBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithSyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithNondeterministicAsyncFailure() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithAsyncFailureBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithAsyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteNondeterministicException() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseCloseNoContent() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteNondeterministicExceptionWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseWriteWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteNondeterministicExceptionWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithNondeterministicSyncFailure() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithSyncFailureBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithSyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithSyncFailureAfterResponseCloseNoContent() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithNondeterministicAsyncFailure() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithAsyncFailureBeforeResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentWriteExceptionWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseWithSyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithAsyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithNondeterministicSyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithSyncFailureBeforeResponseWrite() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseWithSyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseWithSyncFailureAfterResponseCloseNoContent() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithNondeterministicAsyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseWithAsyncFailureBeforeResponseWrite() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseWithAsyncFailureAfterResponseWrite() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseWithAsyncFailureAfterResponseCloseNoContent() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicException() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWrite() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWrite() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContent() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicExceptionWithSyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWriteWithSyncCompletion() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicExceptionWithAsyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncCompletion() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncCompletion() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicExceptionWithSyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWriteWithSyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWriteWithSyncFailure() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithSyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseNondeterministicExceptionWithAsyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testRequestContentCloseExceptionBeforeResponseWriteWithAsyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testRequestContentCloseExceptionAfterResponseWriteWithAsyncFailure() throws Throwable { + } + + @Override + @Test + public void testRequestContentCloseExceptionAfterResponseCloseNoContentWithAsyncFailure() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + @Ignore // N/A: The messagebus protocol does not have content. + public void testResponseWriteCompletionException() throws Throwable { + } + + @Override + @Test + public void testResponseCloseCompletionException() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + @Override + @Test + public void testResponseCloseCompletionExceptionNoContent() throws Throwable { + new TestRunner().expectSuccess() + .executeAndClose(); + } + + private class TestRunner implements Adapter<MbusServer, MyClient, Reply> { + + final LocalWire wire = new LocalWire(); + final SharedMessageBus mbus; + final ServerSession session; + Matcher<Integer> expectedError = null; + boolean successExpected = false; + long timeoutMillis = TimeUnit.SECONDS.toMillis(60); + + TestRunner() { + this(new MessageBusParams().addProtocol(new SimpleProtocol()), + new DestinationSessionParams()); + } + + TestRunner(MessageBusParams mbusParams, DestinationSessionParams sessionParams) { + this.mbus = new SharedMessageBus(new MessageBus(new LocalNetwork(wire), mbusParams)); + this.session = mbus.newDestinationSession(sessionParams); + } + + TestRunner setRequestTimeout(long timeout, TimeUnit unit) { + timeoutMillis = unit.toMillis(timeout); + return this; + } + + TestRunner expectError(Matcher<Integer> matcher) { + assertThat(successExpected, is(false)); + expectedError = matcher; + return this; + } + + TestRunner expectSuccess() { + assertThat(expectedError, is(nullValue())); + successExpected = true; + return this; + } + + @Override + public Module newConfigModule() { + return new AbstractModule() { + + @Override + protected void configure() { + bind(ServerSession.class).toInstance(session); + } + }; + } + + @Override + public Class<MbusServer> getServerProviderClass() { + return MbusServer.class; + } + + @Override + public MyClient newClient(MbusServer server) throws Throwable { + return new MyClient(wire, server.connectionSpec()); + } + + @Override + public Reply executeRequest(MyClient client, boolean withRequestContent) throws Throwable { + // This protocol doesn't have the concept of "request content", so if we are asked to send any, it's a bug. + assertThat(withRequestContent, is(false)); + + final SimpleMessage msg = new SimpleMessage("foo"); + msg.getTrace().setLevel(9); + msg.setRoute(client.route); + msg.setTimeRemaining(timeoutMillis); + assertThat("client.session.send(msg).isAccepted()", + client.session.send(msg).isAccepted(), is(true)); + + final Reply reply = client.replies.poll(60, TimeUnit.SECONDS); + assertThat("reply != null", reply, notNullValue()); + return reply; + } + + @Override + public Iterable<ByteBuffer> newResponseContent() { + return Collections.emptyList(); + } + + @Override + public void validateResponse(Reply reply) throws Throwable { + final String trace = String.valueOf(reply.getTrace()); + if (expectedError != null) { + assertThat(reply.hasErrors(), is(true)); + final int error = reply.getError(0).getCode(); + assertThat(trace, error, expectedError); + } + if (successExpected) { + assertThat(trace, reply.hasErrors(), is(false)); + } + } + + void executeAndClose() throws Throwable { + runTest(this); + session.release(); + mbus.release(); + } + } + + public static class MyClient implements Closeable, ReplyHandler { + + final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); + final MessageBus mbus; + final Route route; + final SourceSession session; + + MyClient(LocalWire wire, String connectionSpec) { + this.mbus = new MessageBus(new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol())); + this.session = mbus.createSourceSession(new SourceSessionParams().setReplyHandler(this)); + this.route = Route.parse(connectionSpec); + } + + @Override + public void close() throws IOException { + session.destroy(); + mbus.destroy(); + } + + @Override + public void handleReply(Reply reply) { + replies.addLast(reply); + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java new file mode 100644 index 00000000000..9d45d2e7abf --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusServerTestCase.java @@ -0,0 +1,374 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.google.inject.AbstractModule; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.application.BindingSetSelector; +import com.yahoo.jdisc.handler.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.test.ServerTestDriver; +import com.yahoo.messagebus.shared.ServerSession; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author Simon Thoresen Hult + */ +public class MbusServerTestCase { + + @Test + public void requireThatServerRetainsSession() { + MySession session = new MySession(); + assertEquals(1, session.refCount); + MbusServer server = new MbusServer(null, session); + assertEquals(2, session.refCount); + session.release(); + assertEquals(1, session.refCount); + server.destroy(); + assertEquals(0, session.refCount); + } + + @Test + public void requireThatNoBindingSetSelectedExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector(null)); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatBindingSetNotFoundExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true, new MySelector("foo")); + assertTrue(driver.sendMessage(new SimpleMessage("bar"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatContainerNotReadyExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newInactiveInstance(true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatBindingNotFoundExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newUnboundInstance(true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestDeniedExceptionIsCaught() { + ServerTestDriver driver = ServerTestDriver.newInstance(MyRequestHandler.newRequestDenied(), true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + assertNotNull(driver.awaitErrors(ErrorCode.APP_FATAL_ERROR)); + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestResponseWorks() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestIsMbus() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + Request request = requestHandler.awaitRequest(); + assertTrue(request instanceof MbusRequest); + Message msg = ((MbusRequest)request).getMessage(); + assertTrue(msg instanceof SimpleMessage); + assertEquals("foo", ((SimpleMessage)msg).getValue()); + assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatReplyInsideMbusResponseIsUsed() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + Reply reply = new SimpleReply("bar"); + reply.swapState(((MbusRequest)requestHandler.request).getMessage()); + assertTrue(requestHandler.sendResponse(new MbusResponse(Response.Status.OK, reply))); + + reply = driver.awaitSuccess(); + assertTrue(reply instanceof SimpleReply); + assertEquals("bar", ((SimpleReply)reply).getValue()); + + assertTrue(driver.close()); + } + + @Test + public void requireThatNonMbusResponseCausesEmptyReply() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + assertTrue(requestHandler.sendResponse(new Response(Response.Status.OK))); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatMbusRequestContentCallsCompletion() throws InterruptedException { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK)); + assertNotNull(content); + MyCompletion completion = new MyCompletion(); + content.close(completion); + assertTrue(completion.completedLatch.await(60, TimeUnit.SECONDS)); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatResponseContentDoesNotSupportWrite() { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + ContentChannel content = requestHandler.responseHandler.handleResponse(new Response(Response.Status.OK)); + assertNotNull(content); + try { + content.write(ByteBuffer.allocate(69), null); + fail(); + } catch (UnsupportedOperationException e) { + + } + content.close(null); + + assertNotNull(driver.awaitSuccess()); + assertTrue(driver.close()); + } + + @Test + public void requireThatResponseErrorCodeDoesNotDuplicateReplyError() { + assertError(Collections.<Integer>emptyList(), + Response.Status.OK); + assertError(Arrays.asList(ErrorCode.APP_FATAL_ERROR), + Response.Status.BAD_REQUEST); + assertError(Arrays.asList(ErrorCode.FATAL_ERROR), + Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR); + assertError(Arrays.asList(ErrorCode.TRANSIENT_ERROR, ErrorCode.APP_FATAL_ERROR), + Response.Status.BAD_REQUEST, ErrorCode.TRANSIENT_ERROR); + assertError(Arrays.asList(ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR), + Response.Status.BAD_REQUEST, ErrorCode.FATAL_ERROR, ErrorCode.TRANSIENT_ERROR); + } + + private static void assertError(List<Integer> expectedErrors, int responseStatus, int... responseErrors) { + MyRequestHandler requestHandler = MyRequestHandler.newInstance(); + ServerTestDriver driver = ServerTestDriver.newInstance(requestHandler, true); + assertTrue(driver.sendMessage(new SimpleMessage("foo"))); + + assertNotNull(requestHandler.awaitRequest()); + Reply reply = new SimpleReply("bar"); + reply.swapState(((MbusRequest)requestHandler.request).getMessage()); + for (int err : responseErrors) { + reply.addError(new Error(err, "err")); + } + assertTrue(requestHandler.sendResponse(new MbusResponse(responseStatus, reply))); + + assertNotNull(reply = driver.awaitReply()); + List<Integer> actual = new LinkedList<>(); + for (int i = 0; i < reply.getNumErrors(); ++i) { + actual.add(reply.getError(i).getCode()); + } + assertEquals(expectedErrors, actual); + assertTrue(driver.close()); + } + + private static class MySelector extends AbstractModule implements BindingSetSelector { + + final String bindingSet; + + MySelector(String bindingSet) { + this.bindingSet = bindingSet; + } + + @Override + protected void configure() { + bind(BindingSetSelector.class).toInstance(this); + } + + @Override + public String select(URI uri) { + return bindingSet; + } + } + + private static class MyRequestHandler extends AbstractRequestHandler { + + final MyRequestContent content; + Request request; + ResponseHandler responseHandler; + + MyRequestHandler(MyRequestContent content) { + this.content = content; + } + + @Override + public ContentChannel handleRequest(Request request, ResponseHandler responseHandler) { + this.request = request; + this.responseHandler = responseHandler; + if (content == null) { + throw new RequestDeniedException(request); + } + return content; + } + + Request awaitRequest() { + try { + if (!content.closeLatch.await(60, TimeUnit.SECONDS)) { + return null; + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (request instanceof MbusRequest) { + ((MbusRequest)request).getMessage().getTrace().trace(0, "Request received by DISC."); + } + return request; + } + + boolean sendResponse(Response response) { + ContentChannel content = responseHandler.handleResponse(response); + if (content == null) { + return false; + } + content.close(null); + return true; + } + + static MyRequestHandler newInstance() { + return new MyRequestHandler(new MyRequestContent()); + } + + static MyRequestHandler newRequestDenied() { + return new MyRequestHandler(null); + } + } + + private static class MyRequestContent implements ContentChannel { + + final CountDownLatch writeLatch = new CountDownLatch(1); + final CountDownLatch closeLatch = new CountDownLatch(1); + + @Override + public void write(ByteBuffer buf, CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + writeLatch.countDown(); + } + + @Override + public void close(CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + closeLatch.countDown(); + } + } + + private static class MyCompletion implements CompletionHandler { + + final CountDownLatch completedLatch = new CountDownLatch(1); + + @Override + public void completed() { + completedLatch.countDown(); + } + + @Override + public void failed(Throwable t) { + + } + } + + private static class MySession implements ServerSession { + + int refCount = 1; + + @Override + public void sendReply(Reply reply) { + + } + + @Override + public MessageHandler getMessageHandler() { + return null; + } + + @Override + public void setMessageHandler(MessageHandler msgHandler) { + + } + + @Override + public String connectionSpec() { + return null; + } + + @Override + public String name() { + return null; + } + + @Override + public ResourceReference refer() { + ++refCount; + return new ResourceReference() { + @Override + public void close() { + --refCount; + } + }; + } + + @Override + public void release() { + --refCount; + } + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java new file mode 100644 index 00000000000..a7ee355094f --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java @@ -0,0 +1,137 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedDestinationSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * @author Simon Thoresen Hult + */ +public class ServerThreadingTestCase { + + private static final int NUM_THREADS = 32; + private static final int NUM_REQUESTS = 1000; + + @Test + public void requireThatServerIsThreadSafe() throws Exception { + final LocalWire wire = new LocalWire(); + final Client client = new Client(wire); + final Server server = new Server(wire); + + for (int i = 0; i < NUM_REQUESTS; ++i) { + final Message msg = new SimpleMessage("foo"); + msg.setRoute(Route.parse(server.delegate.connectionSpec())); + msg.pushHandler(client); + assertThat(client.session.send(msg).isAccepted(), is(true)); + } + for (int i = 0; i < NUM_REQUESTS; ++i) { + final Reply reply = client.replies.poll(600, TimeUnit.SECONDS); + assertThat(reply, instanceOf(EmptyReply.class)); + assertThat(reply.hasErrors(), is(false)); + } + + assertThat(client.close(), is(true)); + assertThat(server.close(), is(true)); + } + + private static class Client implements ReplyHandler { + + final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); + final MessageBus mbus; + final SourceSession session; + + Client(final LocalWire wire) { + mbus = new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol())); + session = mbus.createSourceSession( + new SourceSessionParams() + .setReplyHandler(this) + .setThrottlePolicy(null)); + } + + @Override + public void handleReply(final Reply reply) { + replies.addLast(reply); + } + + boolean close() { + return session.destroy() && mbus.destroy(); + } + } + + private static class Server extends MbusRequestHandler { + + final Executor executor = Executors.newFixedThreadPool(NUM_THREADS); + final MbusServer delegate; + final TestDriver driver; + + Server(final LocalWire wire) { + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + delegate = newMbusServer(driver, wire); + + final ContainerBuilder builder = driver.newContainerBuilder(); + builder.serverBindings().bind("mbus://*/*", this); + driver.activateContainer(builder); + delegate.start(); + } + + @Override + public void handleMessage(final Message msg) { + executor.execute(new Runnable() { + + @Override + public void run() { + final Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }); + } + + boolean close() { + delegate.release(); + return driver.close(); + } + } + + private static MbusServer newMbusServer(final CurrentContainer container, final LocalWire wire) { + final SharedMessageBus mbus = new SharedMessageBus(new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol()))); + final SharedDestinationSession session = mbus.newDestinationSession( + new DestinationSessionParams()); + final MbusServer server = new MbusServer(container, session); + session.release(); + mbus.release(); + return server; + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java new file mode 100644 index 00000000000..ef290a070cb --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ClientTestDriverTestCase.java @@ -0,0 +1,32 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * @author Simon Thoresen Hult + */ +public class ClientTestDriverTestCase { + + @Test + public void requireThatFactoryMethodsWork() throws ListenFailedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + assertNotNull(driver); + assertTrue(driver.close()); + + driver = ClientTestDriver.newInstanceWithProtocol(new SimpleProtocol()); + assertNotNull(driver); + assertTrue(driver.close()); + + Slobrok slobrok = new Slobrok(); + driver = ClientTestDriver.newInstanceWithExternSlobrok(slobrok.configId()); + assertNotNull(driver); + assertTrue(driver.close()); + } +} diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java new file mode 100644 index 00000000000..f6ae2335d12 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/test/ServerTestDriverTestCase.java @@ -0,0 +1,34 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc.test; + +import com.yahoo.jdisc.test.NonWorkingRequestHandler; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * @author Simon Thoresen Hult + */ +public class ServerTestDriverTestCase { + + @Test + public void requireThatFactoryMethodsWork() throws ListenFailedException { + ServerTestDriver driver = ServerTestDriver.newInstance(new NonWorkingRequestHandler(), false); + assertNotNull(driver); + assertTrue(driver.close()); + + driver = ServerTestDriver.newInstanceWithProtocol(new SimpleProtocol(), new NonWorkingRequestHandler(), false); + assertNotNull(driver); + assertTrue(driver.close()); + + Slobrok slobrok = new Slobrok(); + driver = ServerTestDriver.newInstanceWithExternSlobrok(slobrok.configId(), new NonWorkingRequestHandler(), false); + assertNotNull(driver); + assertTrue(driver.close()); + } + +} |