diff options
Diffstat (limited to 'container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java')
-rw-r--r-- | container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java | 345 |
1 files changed, 345 insertions, 0 deletions
diff --git a/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java new file mode 100644 index 00000000000..9cfd1fd02b9 --- /dev/null +++ b/container-messagebus/src/test/java/com/yahoo/messagebus/jdisc/MbusClientTestCase.java @@ -0,0 +1,345 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.ResourceReference; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDeniedException; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.jdisc.test.ClientTestDriver; +import com.yahoo.messagebus.shared.ClientSession; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author Simon Thoresen Hult + */ +public class MbusClientTestCase { + + @Test + public void requireThatClientRetainsSession() { + MySession session = new MySession(); + assertEquals(1, session.refCount); + MbusClient client = new MbusClient(session); + assertEquals(2, session.refCount); + session.release(); + assertEquals(1, session.refCount); + client.destroy(); + assertEquals(0, session.refCount); + } + + @Test + public void requireThatRequestResponseWorks() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertEquals(Response.Status.OK, response.getStatus()); + assertTrue(driver.close()); + } + + @Test + public void requireThatNonMbusRequestIsDenied() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + Request serverReq = null; + Request clientReq = null; + try { + serverReq = driver.newServerRequest(); + clientReq = new Request(serverReq, URI.create("mbus://host/path")); + clientReq.connect(MyResponseHandler.newInstance()); + fail(); + } catch (RequestDeniedException e) { + System.out.println(e.getMessage()); + } finally { + if (serverReq != null) { + serverReq.release(); + } + if (clientReq != null) { + clientReq.release(); + } + } + assertTrue(driver.close()); + } + + @Test + public void requireThatRequestContentDoesNotSupportWrite() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Request request = null; + ContentChannel content; + try { + request = driver.newClientRequest(new SimpleMessage("foo")); + content = request.connect(responseHandler); + } finally { + if (request != null) { + request.release(); + } + } + try { + content.write(ByteBuffer.allocate(69), null); + fail(); + } catch (UnsupportedOperationException e) { + + } + content.close(null); + + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + assertNotNull(responseHandler.awaitResponse()); + assertTrue(driver.close()); + } + + @Test + public void requireThatResponseIsMbus() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertTrue(response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof EmptyReply); + assertTrue(driver.close()); + } + + @Test + public void requireThatServerReceivesGivenMessage() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + + Message msg = driver.awaitMessage(); + assertTrue(msg instanceof SimpleMessage); + assertEquals("foo", ((SimpleMessage)msg).getValue()); + + Reply reply = new EmptyReply(); + reply.swapState(msg); + driver.sendReply(reply); + + assertNotNull(responseHandler.awaitResponse()); + assertTrue(driver.close()); + } + + @Test + public void requireThatClientReceivesGivenReply() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + assertTrue(driver.sendMessage(new SimpleMessage("foo"), responseHandler)); + + Message msg = driver.awaitMessage(); // TODO: Timing sensitive + assertNotNull(msg); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + driver.sendReply(reply); + + Response response = responseHandler.awaitResponse(); + assertTrue(response instanceof MbusResponse); + reply = ((MbusResponse)response).getReply(); + assertTrue(reply instanceof SimpleReply); + assertEquals("bar", ((SimpleReply)reply).getValue()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToResponse() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Message msg = new SimpleMessage("foo"); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + ReplyHandler pushedHandler = new MyReplyHandler(); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + assertTrue(driver.sendMessage(msg, responseHandler)); + assertTrue(driver.awaitMessageAndSendReply(new EmptyReply())); + + Response response = responseHandler.awaitResponse(); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToSyncMbusSendFailureResponse() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + driver.sourceSession().close(); + + Message msg = new SimpleMessage("foo"); + ReplyHandler pushedHandler = new MyReplyHandler(); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + driver.sendMessage(msg, responseHandler); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatStateIsTransferredToTimeoutResponse() throws InterruptedException { + ClientTestDriver driver = ClientTestDriver.newInstance(); + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + + Message msg = new SimpleMessage("foo"); + ReplyHandler pushedHandler = new MyReplyHandler(); + Object pushedCtx = new Object(); + msg.setContext(pushedCtx); + msg.pushHandler(pushedHandler); + Object currentCtx = new Object(); + msg.setContext(currentCtx); + msg.getTrace().setLevel(6); + + Request request = driver.newClientRequest(msg); + request.setTimeout(1, TimeUnit.MILLISECONDS); + assertTrue(driver.sendRequest(request, responseHandler)); + request.release(); + + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertSame(currentCtx, reply.getContext()); + assertEquals(6, reply.getTrace().getLevel()); + assertSame(pushedHandler, reply.popHandler()); + assertSame(pushedCtx, reply.getContext()); + assertTrue(driver.close()); + } + + @Test + public void requireThatSyncMbusSendFailureRespondsWithError() { + ClientTestDriver driver = ClientTestDriver.newInstance(); + driver.sourceSession().close(); + + MyResponseHandler responseHandler = MyResponseHandler.newInstance(); + driver.sendMessage(new SimpleMessage("foo"), responseHandler); + Response response = responseHandler.awaitResponse(); + assertNotNull(response); + assertTrue(response.getClass().getName(), response instanceof MbusResponse); + Reply reply = ((MbusResponse)response).getReply(); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.SEND_QUEUE_CLOSED, reply.getError(0).getCode()); + assertTrue(driver.close()); + } + + private static class MyResponseHandler implements ResponseHandler { + + final MyResponseContent content; + Response response; + + MyResponseHandler(MyResponseContent content) { + this.content = content; + } + + Response awaitResponse() { + try { + content.closeLatch.await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (response instanceof MbusResponse) { + //System.out.println(((MbusResponse)response).getReply().getTrace()); + } + return response; + } + + @Override + public ContentChannel handleResponse(Response response) { + this.response = response; + return content; + } + + static MyResponseHandler newInstance() { + return new MyResponseHandler(new MyResponseContent()); + } + } + + private static class MyResponseContent implements ContentChannel { + + final CountDownLatch writeLatch = new CountDownLatch(1); + final CountDownLatch closeLatch = new CountDownLatch(1); + + @Override + public void write(ByteBuffer buf, CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + writeLatch.countDown(); + } + + @Override + public void close(CompletionHandler handler) { + if (handler != null) { + handler.completed(); + } + closeLatch.countDown(); + } + } + + private static class MySession implements ClientSession { + + int refCount = 1; + + @Override + public Result sendMessage(Message msg) { + return null; + } + + @Override + public ResourceReference refer() { + ++refCount; + return new ResourceReference() { + @Override + public void close() { + --refCount; + } + }; + } + + @Override + public void release() { + --refCount; + } + } + + private static class MyReplyHandler implements ReplyHandler { + + @Override + public void handleReply(Reply reply) { + + } + } +} |