diff options
Diffstat (limited to 'jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java')
-rw-r--r-- | jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java new file mode 100644 index 00000000000..99ed1bd5877 --- /dev/null +++ b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/ServerThreadingTestCase.java @@ -0,0 +1,137 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.jdisc; + +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.service.CurrentContainer; +import com.yahoo.jdisc.test.TestDriver; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.network.local.LocalNetwork; +import com.yahoo.messagebus.network.local.LocalWire; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedDestinationSession; +import com.yahoo.messagebus.shared.SharedMessageBus; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ServerThreadingTestCase { + + private static final int NUM_THREADS = 32; + private static final int NUM_REQUESTS = 1000; + + @Test + public void requireThatServerIsThreadSafe() throws Exception { + final LocalWire wire = new LocalWire(); + final Client client = new Client(wire); + final Server server = new Server(wire); + + for (int i = 0; i < NUM_REQUESTS; ++i) { + final Message msg = new SimpleMessage("foo"); + msg.setRoute(Route.parse(server.delegate.connectionSpec())); + msg.pushHandler(client); + assertThat(client.session.send(msg).isAccepted(), is(true)); + } + for (int i = 0; i < NUM_REQUESTS; ++i) { + final Reply reply = client.replies.poll(600, TimeUnit.SECONDS); + assertThat(reply, instanceOf(EmptyReply.class)); + assertThat(reply.hasErrors(), is(false)); + } + + assertThat(client.close(), is(true)); + assertThat(server.close(), is(true)); + } + + private static class Client implements ReplyHandler { + + final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); + final MessageBus mbus; + final SourceSession session; + + Client(final LocalWire wire) { + mbus = new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol())); + session = mbus.createSourceSession( + new SourceSessionParams() + .setReplyHandler(this) + .setThrottlePolicy(null)); + } + + @Override + public void handleReply(final Reply reply) { + replies.addLast(reply); + } + + boolean close() { + return session.destroy() && mbus.destroy(); + } + } + + private static class Server extends MbusRequestHandler { + + final Executor executor = Executors.newFixedThreadPool(NUM_THREADS); + final MbusServer delegate; + final TestDriver driver; + + Server(final LocalWire wire) { + driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); + delegate = newMbusServer(driver, wire); + + final ContainerBuilder builder = driver.newContainerBuilder(); + builder.serverBindings().bind("mbus://*/*", this); + driver.activateContainer(builder); + delegate.start(); + } + + @Override + public void handleMessage(final Message msg) { + executor.execute(new Runnable() { + + @Override + public void run() { + final Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }); + } + + boolean close() { + delegate.release(); + return driver.close(); + } + } + + private static MbusServer newMbusServer(final CurrentContainer container, final LocalWire wire) { + final SharedMessageBus mbus = new SharedMessageBus(new MessageBus( + new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol()))); + final SharedDestinationSession session = mbus.newDestinationSession( + new DestinationSessionParams()); + final MbusServer server = new MbusServer(container, session); + session.release(); + mbus.release(); + return server; + } +} |