diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /messagebus/src/test |
Publish
Diffstat (limited to 'messagebus/src/test')
40 files changed, 6032 insertions, 0 deletions
diff --git a/messagebus/src/test/files/.gitignore b/messagebus/src/test/files/.gitignore new file mode 100644 index 00000000000..b1f2d513ab4 --- /dev/null +++ b/messagebus/src/test/files/.gitignore @@ -0,0 +1 @@ +test.cfg diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java new file mode 100755 index 00000000000..cde801d81f2 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java @@ -0,0 +1,169 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import junit.framework.TestCase; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ChokeTestCase extends TestCase { + + //////////////////////////////////////////////////////////////////////////////// + // + // Setup + // + //////////////////////////////////////////////////////////////////////////////// + + Slobrok slobrok; + TestServer srcServer, dstServer; + SourceSession srcSession; + DestinationSession dstSession; + + @Override + public void setUp() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor())); + srcServer = new TestServer(new MessageBusParams().setRetryPolicy(null).addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + srcSession = srcServer.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor())); + assertTrue(srcServer.waitSlobrok("dst/session", 1)); + } + + @Override + public void tearDown() { + slobrok.stop(); + dstSession.destroy(); + dstServer.destroy(); + srcSession.destroy(); + srcServer.destroy(); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Tests + // + //////////////////////////////////////////////////////////////////////////////// + + public void testMaxCount() { + int max = 10; + dstServer.mb.setMaxPendingCount(max); + List<Message> lst = new ArrayList<>(); + for (int i = 0; i < max * 2; ++i) { + if (i < max) { + assertEquals(i, dstServer.mb.getPendingCount()); + } else { + assertEquals(max, dstServer.mb.getPendingCount()); + } + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + if (i < max) { + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + lst.add(msg); + } else { + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode()); + } + } + for (int i = 0; i < 5; ++i) { + Message msg = lst.remove(0); + dstSession.acknowledge(msg); + + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + assertNotNull(msg = reply.getMessage()); + assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted()); + + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + lst.add(msg); + } + while (!lst.isEmpty()) { + assertEquals(lst.size(), dstServer.mb.getPendingCount()); + Message msg = lst.remove(0); + dstSession.acknowledge(msg); + + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + } + assertEquals(0, dstServer.mb.getPendingCount()); + } + + public void testMaxSize() { + int size = createMessage("msg").getApproxSize(); + int max = size * 10; + dstServer.mb.setMaxPendingSize(max); + List<Message> lst = new ArrayList<>(); + for (int i = 0; i < max * 2; i += size) { + if (i < max) { + assertEquals(i, dstServer.mb.getPendingSize()); + } else { + assertEquals(max, dstServer.mb.getPendingSize()); + } + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + if (i < max) { + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + lst.add(msg); + } else { + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode()); + } + } + for (int i = 0; i < 5; ++i) { + Message msg = lst.remove(0); + dstSession.acknowledge(msg); + + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + assertNotNull(msg = reply.getMessage()); + assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted()); + + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + lst.add(msg); + } + while (!lst.isEmpty()) { + assertEquals(size * lst.size(), dstServer.mb.getPendingSize()); + Message msg = lst.remove(0); + dstSession.acknowledge(msg); + + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + } + assertEquals(0, dstServer.mb.getPendingSize()); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Utilities + // + //////////////////////////////////////////////////////////////////////////////// + + private static Message createMessage(String msg) { + Message ret = new SimpleMessage(msg); + ret.getTrace().setLevel(9); + return ret; + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ConfigAgentTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ConfigAgentTestCase.java new file mode 100755 index 00000000000..f6b21e26030 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/ConfigAgentTestCase.java @@ -0,0 +1,199 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.config.subscription.ConfigSet; +import com.yahoo.config.subscription.ConfigURI; +import com.yahoo.messagebus.routing.HopSpec; +import com.yahoo.messagebus.routing.RouteSpec; +import com.yahoo.messagebus.routing.RoutingSpec; +import com.yahoo.messagebus.routing.RoutingTableSpec; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ConfigAgentTestCase { + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testRoutingConfig() throws InterruptedException, IOException { + LocalHandler handler = new LocalHandler(); + assertFalse(testHalf(handler.spec)); + assertFalse(testFull(handler.spec)); + + ConfigSet set = new ConfigSet(); + set.addBuilder("test", writeFull()); + + ConfigAgent agent = new ConfigAgent(ConfigURI.createFromIdAndSource("test", set), handler); + assertFalse(testHalf(handler.spec)); + assertFalse(testFull(handler.spec)); + agent.subscribe(); + assertFalse(testHalf(handler.spec)); + assertTrue(testFull(handler.spec)); + + handler.reset(); + set.addBuilder("test", writeHalf()); + assertTrue(handler.await(120, TimeUnit.SECONDS)); + assertTrue(testHalf(handler.spec)); + assertFalse(testFull(handler.spec)); + + handler.reset(); + set.addBuilder("test", writeFull()); + assertTrue(handler.await(120, TimeUnit.SECONDS)); + assertTrue(testFull(handler.spec)); + assertFalse(testHalf(handler.spec)); + } + + private boolean testHalf(RoutingSpec spec) { + if (spec.getNumTables() != 1) { + return false; + } + assertTables(spec, 1); + return true; + } + + private boolean testFull(RoutingSpec spec) { + if (spec.getNumTables() != 2) { + return false; + } + assertTables(spec, 2); + return true; + } + + private void assertTables(RoutingSpec spec, int numTables) { + assertEquals(numTables, spec.getNumTables()); + if (numTables > 0) { + assertEquals("foo", spec.getTable(0).getProtocol()); + assertEquals(2, spec.getTable(0).getNumHops()); + assertEquals("foo-h1", spec.getTable(0).getHop(0).getName()); + assertEquals("foo-h1-sel", spec.getTable(0).getHop(0).getSelector()); + assertEquals(2, spec.getTable(0).getHop(0).getNumRecipients()); + assertEquals("foo-h1-r1", spec.getTable(0).getHop(0).getRecipient(0)); + assertEquals("foo-h1-r2", spec.getTable(0).getHop(0).getRecipient(1)); + assertEquals(true, spec.getTable(0).getHop(0).getIgnoreResult()); + assertEquals("foo-h2", spec.getTable(0).getHop(1).getName()); + assertEquals("foo-h2-sel", spec.getTable(0).getHop(1).getSelector()); + assertEquals(2, spec.getTable(0).getHop(1).getNumRecipients()); + assertEquals("foo-h2-r1", spec.getTable(0).getHop(1).getRecipient(0)); + assertEquals("foo-h2-r2", spec.getTable(0).getHop(1).getRecipient(1)); + assertEquals(2, spec.getTable(0).getNumRoutes()); + assertEquals("foo-r1", spec.getTable(0).getRoute(0).getName()); + assertEquals(2, spec.getTable(0).getRoute(0).getNumHops()); + assertEquals("foo-h1", spec.getTable(0).getRoute(0).getHop(0)); + assertEquals("foo-h2", spec.getTable(0).getRoute(0).getHop(1)); + assertEquals("foo-r2", spec.getTable(0).getRoute(1).getName()); + assertEquals(2, spec.getTable(0).getRoute(1).getNumHops()); + assertEquals("foo-h2", spec.getTable(0).getRoute(1).getHop(0)); + assertEquals("foo-h1", spec.getTable(0).getRoute(1).getHop(1)); + } + if (numTables > 1) { + assertEquals("bar", spec.getTable(1).getProtocol()); + assertEquals(2, spec.getTable(1).getNumHops()); + assertEquals("bar-h1", spec.getTable(1).getHop(0).getName()); + assertEquals("bar-h1-sel", spec.getTable(1).getHop(0).getSelector()); + assertEquals(2, spec.getTable(1).getHop(0).getNumRecipients()); + assertEquals("bar-h1-r1", spec.getTable(1).getHop(0).getRecipient(0)); + assertEquals("bar-h1-r2", spec.getTable(1).getHop(0).getRecipient(1)); + assertEquals("bar-h2", spec.getTable(1).getHop(1).getName()); + assertEquals("bar-h2-sel", spec.getTable(1).getHop(1).getSelector()); + assertEquals(2, spec.getTable(1).getHop(1).getNumRecipients()); + assertEquals("bar-h2-r1", spec.getTable(1).getHop(1).getRecipient(0)); + assertEquals("bar-h2-r2", spec.getTable(1).getHop(1).getRecipient(1)); + assertEquals(2, spec.getTable(1).getNumRoutes()); + assertEquals("bar-r1", spec.getTable(1).getRoute(0).getName()); + assertEquals(2, spec.getTable(1).getRoute(0).getNumHops()); + assertEquals("bar-h1", spec.getTable(1).getRoute(0).getHop(0)); + assertEquals("bar-h2", spec.getTable(1).getRoute(0).getHop(1)); + assertEquals("bar-r2", spec.getTable(1).getRoute(1).getName()); + assertEquals(2, spec.getTable(1).getRoute(1).getNumHops()); + assertEquals("bar-h2", spec.getTable(1).getRoute(1).getHop(0)); + assertEquals("bar-h1", spec.getTable(1).getRoute(1).getHop(1)); + } + } + + private static MessagebusConfig.Builder writeHalf() { + return writeTables(1); + } + + private static MessagebusConfig.Builder writeFull() { + return writeTables(2); + } + + private static MessagebusConfig.Builder writeTables(int numTables) { + MessagebusConfig.Builder builder = new MessagebusConfig.Builder(); + if (numTables > 0) { + MessagebusConfig.Routingtable.Builder table = new MessagebusConfig.Routingtable.Builder(); + table.protocol("foo"); + table.hop(getHop("foo-h1", "foo-h1-sel", "foo-h1-r1", "foo-h1-r2", true)); + table.hop(getHop("foo-h2", "foo-h2-sel", "foo-h2-r1", "foo-h2-r2", false)); + table.route(getRoute("foo-r1", "foo-h1", "foo-h2")); + table.route(getRoute("foo-r2", "foo-h2", "foo-h1")); + builder.routingtable(table); + } + if (numTables > 1) { + MessagebusConfig.Routingtable.Builder table = new MessagebusConfig.Routingtable.Builder(); + table.protocol("bar"); + table.hop(getHop("bar-h1", "bar-h1-sel", "bar-h1-r1", "bar-h1-r2", false)); + table.hop(getHop("bar-h2", "bar-h2-sel", "bar-h2-r1", "bar-h2-r2", false)); + table.route(getRoute("bar-r1", "bar-h1", "bar-h2")); + table.route(getRoute("bar-r2", "bar-h2", "bar-h1")); + builder.routingtable(table); + } + return builder; + } + + private static MessagebusConfig.Routingtable.Route.Builder getRoute(String name, String hop1, String hop2) { + MessagebusConfig.Routingtable.Route.Builder route = new MessagebusConfig.Routingtable.Route.Builder(); + route.name(name); + route.hop(hop1); + route.hop(hop2); + return route; + } + + private static MessagebusConfig.Routingtable.Hop.Builder getHop(String name, String selector, String recipient1, String recipient2, boolean ignoreresult) { + MessagebusConfig.Routingtable.Hop.Builder hop = new MessagebusConfig.Routingtable.Hop.Builder(); + hop.name(name); + hop.selector(selector); + hop.recipient(recipient1); + hop.recipient(recipient2); + hop.ignoreresult(ignoreresult); + return hop; + } + + private static class LocalHandler implements ConfigHandler { + + volatile RoutingSpec spec = new RoutingSpec(); + + public void setupRouting(RoutingSpec spec) { + this.spec = spec; + } + + public void reset() { + spec = null; + } + + public boolean await(int timeout, TimeUnit unit) throws InterruptedException { + long millis = System.currentTimeMillis() + unit.toMillis(timeout); + while (spec == null) { + long now = System.currentTimeMillis(); + if (now >= millis) { + return false; + } + Thread.sleep(1000); + } + return true; + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java b/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java new file mode 100644 index 00000000000..12749ca9bc4 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java @@ -0,0 +1,17 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.concurrent.Timer; + +/** + * @author <a href="mailto:thomasg@yahoo-inc.com">Thomas Gundersen</a> + */ +class CustomTimer implements Timer { + + long millis = 0; + + @Override + public long milliTime() { + return millis; + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java new file mode 100755 index 00000000000..e67fc5030ed --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java @@ -0,0 +1,92 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus;
+
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.RoutingTableSpec;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ErrorTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ Error err = new Error(69, "foo");
+ assertEquals(69, err.getCode());
+ assertEquals("foo", err.getMessage());
+
+ assertFalse(new Error(ErrorCode.TRANSIENT_ERROR, "foo").isFatal());
+ assertFalse(new Error(ErrorCode.TRANSIENT_ERROR + 1, "foo").isFatal());
+ assertTrue(new Error(ErrorCode.FATAL_ERROR, "foo").isFatal());
+ assertTrue(new Error(ErrorCode.FATAL_ERROR + 1, "foo").isFatal());
+ }
+
+ @Test
+ public void requireThatErrorIsPropagated() throws Exception {
+ RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
+ table.addHop("itr", "test/itr/session", Arrays.asList("test/itr/session"));
+ table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"));
+ table.addRoute("test", Arrays.asList("itr", "dst"));
+
+ Slobrok slobrok = new Slobrok();
+ TestServer src = new TestServer("test/src", table, slobrok, null, null);
+ TestServer itr = new TestServer("test/itr", table, slobrok, null, null);
+ TestServer dst = new TestServer("test/dst", table, slobrok, null, null);
+
+ Receptor ss_rr = new Receptor();
+ SourceSession ss = src.mb.createSourceSession(ss_rr);
+
+ Receptor is_mr = new Receptor();
+ Receptor is_rr = new Receptor();
+ IntermediateSession is = itr.mb.createIntermediateSession("session", true, is_mr, is_rr);
+
+ Receptor ds_mr = new Receptor();
+ DestinationSession ds = dst.mb.createDestinationSession("session", true, ds_mr);
+
+ src.waitSlobrok("test/itr/session", 1);
+ src.waitSlobrok("test/dst/session", 1);
+ itr.waitSlobrok("test/dst/session", 1);
+
+ for (int i = 0; i < 5; i++) {
+ assertTrue(ss.send(new SimpleMessage("msg"), "test").isAccepted());
+ Message msg = is_mr.getMessage(60);
+ assertNotNull(msg);
+ is.forward(msg);
+
+ assertNotNull(msg = ds_mr.getMessage(60));
+ Reply reply = new EmptyReply();
+ msg.swapState(reply);
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
+ ds.reply(reply);
+
+ assertNotNull(reply = is_rr.getReply(60));
+ assertEquals(reply.getNumErrors(), 1);
+ assertEquals(reply.getError(0).getService(), "test/dst/session");
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
+ is.forward(reply);
+
+ assertNotNull(reply = ss_rr.getReply(60));
+ assertEquals(reply.getNumErrors(), 2);
+ assertEquals(reply.getError(0).getService(), "test/dst/session");
+ assertEquals(reply.getError(1).getService(), "test/itr/session");
+ }
+
+ ss.destroy();
+ is.destroy();
+ ds.destroy();
+
+ dst.destroy();
+ itr.destroy();
+ src.destroy();
+ slobrok.stop();
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java new file mode 100644 index 00000000000..95b4e3663e8 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java @@ -0,0 +1,144 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.RetryTransientErrorsPolicy; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingPolicy; +import com.yahoo.messagebus.routing.test.CustomPolicyFactory; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.*; + +public class MessageBusTestCase { + + @Test + public void requireThatBucketSequencingWithResenderEnabledCausesError() throws ListenFailedException { + Slobrok slobrok = new Slobrok(); + TestServer server = new TestServer(new MessageBusParams() + .addProtocol(new SimpleProtocol()) + .setRetryPolicy(new RetryTransientErrorsPolicy()), + new RPCNetworkParams() + .setSlobrokConfigId(slobrok.configId())); + Receptor receptor = new Receptor(); + SourceSession session = server.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(receptor)); + assertTrue(session.send(new SimpleMessage("foo") { + @Override + public boolean hasBucketSequence() { + return true; + } + }.setRoute(Route.parse("bar"))).isAccepted()); + Reply reply = receptor.getReply(60); + assertNotNull(reply); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.SEQUENCE_ERROR, reply.getError(0).getCode()); + session.destroy(); + server.destroy(); + slobrok.stop(); + } + + @Test + public void testConnectionSpec() throws ListenFailedException, UnknownHostException { + // Setup servers and sessions. + Slobrok slobrok = new Slobrok(); + List<TestServer> servers = new ArrayList<>(); + + TestServer srcServer = new TestServer("feeder", null, slobrok, null, null); + servers.add(srcServer); + SourceSession src = servers.get(0).mb.createSourceSession(new Receptor()); + + List<IntermediateSession> sessions = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + TestServer server = new TestServer("intermediate/" + i, null, slobrok, null, null); + servers.add(server); + sessions.add(server.mb.createIntermediateSession("session", true, new Receptor(), new Receptor())); + } + + TestServer dstServer = new TestServer("destination", null, slobrok, null, null); + DestinationSession dst = dstServer.mb.createDestinationSession("session", true, new Receptor()); + + assertTrue(srcServer.waitSlobrok("intermediate/*/session", sessions.size())); + assertTrue(srcServer.waitSlobrok("destination/session", 1)); + + StringBuilder route = new StringBuilder(); + for (int i = 0; i < sessions.size(); i++) { + route.append("intermediate/").append(i).append("/session "); + route.append(sessions.get(i).getConnectionSpec()).append(" "); + } + route.append(dst.getConnectionSpec()); + + Message msg = new SimpleMessage("empty"); + assertTrue(src.send(msg, Route.parse(route.toString())).isAccepted()); + for (IntermediateSession itr : sessions) { + // Received using session name. + assertNotNull(msg = ((Receptor)itr.getMessageHandler()).getMessage(60)); + itr.forward(msg); + + // Received using connection spec. + assertNotNull(msg = ((Receptor)itr.getMessageHandler()).getMessage(60)); + itr.forward(msg); + } + assertNotNull(msg = ((Receptor)dst.getMessageHandler()).getMessage(60)); + dst.acknowledge(msg); + for (int i = sessions.size(); --i >= 0;) { + IntermediateSession itr = sessions.get(i); + + // Received for connection spec. + Reply reply = ((Receptor)itr.getReplyHandler()).getReply(60); + assertNotNull(reply); + itr.forward(reply); + + // Received for session name. + assertNotNull(reply = ((Receptor)itr.getReplyHandler()).getReply(60)); + itr.forward(reply); + } + assertNotNull(((Receptor)src.getReplyHandler()).getReply(60)); + + // Cleanup. + for (IntermediateSession session : sessions) { + session.destroy(); + } + for (TestServer server : servers) { + server.destroy(); + } + slobrok.stop(); + } + + @Test + public void testRoutingPolicyCache() throws ListenFailedException, UnknownHostException { + Slobrok slobrok = new Slobrok(); + String config = "slobrok[1]\nslobrok[0].connectionspec \"" + new Spec("localhost", slobrok.port()).toString() + "\"\n"; + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory()); + MessageBus bus = new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId("raw:" + config)), + new MessageBusParams().addProtocol(protocol)); + + RoutingPolicy all = bus.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null); + assertNotNull(all); + + RoutingPolicy ref = bus.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null); + assertNotNull(ref); + assertSame(all, ref); + + RoutingPolicy allArg = bus.getRoutingPolicy(SimpleProtocol.NAME, "Custom", "Arg"); + assertNotNull(allArg); + assertNotSame(all, allArg); + + RoutingPolicy refArg = bus.getRoutingPolicy(SimpleProtocol.NAME, "Custom", "Arg"); + assertNotNull(refArg); + assertSame(allArg, refArg); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/MessengerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/MessengerTestCase.java new file mode 100644 index 00000000000..37a99381cd1 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/MessengerTestCase.java @@ -0,0 +1,124 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class MessengerTestCase { + + @Test + public void requireThatSyncWithSelfDoesNotCauseDeadLock() throws InterruptedException { + final Messenger msn = new Messenger(); + msn.start(); + + final CountDownLatch latch = new CountDownLatch(1); + msn.enqueue(new Messenger.Task() { + + @Override + public void run() { + msn.sync(); + } + + @Override + public void destroy() { + latch.countDown(); + } + }); + assertTrue(latch.await(60, TimeUnit.SECONDS)); + } + + @Test + public void requireThatTaskIsExecuted() throws InterruptedException { + Messenger msn = new Messenger(); + msn.start(); + assertTrue(tryMessenger(msn)); + } + + @Test + public void requireThatRunExceptionIsCaught() throws InterruptedException { + Messenger msn = new Messenger(); + msn.start(); + msn.enqueue(new Messenger.Task() { + @Override + public void run() { + throw new RuntimeException(); + } + + @Override + public void destroy() { + + } + }); + assertTrue(tryMessenger(msn)); + } + + @Test + public void requireThatDestroyExceptionIsCaught() throws InterruptedException { + Messenger msn = new Messenger(); + msn.start(); + msn.enqueue(new Messenger.Task() { + @Override + public void run() { + + } + + @Override + public void destroy() { + throw new RuntimeException(); + } + }); + assertTrue(tryMessenger(msn)); + } + + @Test + public void requireThatRunAndDestroyExceptionsAreCaught() throws InterruptedException { + Messenger msn = new Messenger(); + msn.start(); + msn.enqueue(new Messenger.Task() { + @Override + public void run() { + throw new RuntimeException(); + } + + @Override + public void destroy() { + throw new RuntimeException(); + } + }); + assertTrue(tryMessenger(msn)); + } + + private static boolean tryMessenger(Messenger msn) { + MyTask task = new MyTask(); + msn.enqueue(task); + try { + return task.runLatch.await(60, TimeUnit.SECONDS) && + task.destroyLatch.await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + return false; + } + } + + private static class MyTask implements Messenger.Task { + + final CountDownLatch runLatch = new CountDownLatch(1); + final CountDownLatch destroyLatch = new CountDownLatch(1); + + @Override + public void run() { + runLatch.countDown(); + } + + @Override + public void destroy() { + destroyLatch.countDown(); + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ProtocolRepositoryTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ProtocolRepositoryTestCase.java new file mode 100644 index 00000000000..ddb21baadab --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/ProtocolRepositoryTestCase.java @@ -0,0 +1,103 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.messagebus.routing.RoutingContext; +import com.yahoo.messagebus.routing.RoutingPolicy; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ProtocolRepositoryTestCase { + + @Test + public void requireThatPolicyCanBeNull() { + ProtocolRepository repo = new ProtocolRepository(); + SimpleProtocol protocol = new SimpleProtocol(); + repo.putProtocol(protocol); + assertNull(repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null)); + } + + @Test + public void requireThatPolicyCanBeCreated() { + ProtocolRepository repo = new ProtocolRepository(); + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new MyFactory()); + repo.putProtocol(protocol); + assertNotNull(repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null)); + } + + @Test + public void requireThatPolicyIsCached() { + ProtocolRepository repo = new ProtocolRepository(); + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new MyFactory()); + repo.putProtocol(protocol); + + RoutingPolicy prev = repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null); + assertNotNull(prev); + + RoutingPolicy next = repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null); + assertNotNull(next); + assertSame(prev, next); + } + + @Test + public void requireThatPolicyParamIsPartOfCacheKey() { + ProtocolRepository repo = new ProtocolRepository(); + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new MyFactory()); + repo.putProtocol(protocol); + + RoutingPolicy prev = repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", "foo"); + assertNotNull(prev); + + RoutingPolicy next = repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", "bar"); + assertNotNull(next); + assertNotSame(prev, next); + } + + @Test + public void requireThatCreatePolicyExceptionIsCaught() { + ProtocolRepository repo = new ProtocolRepository(); + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + throw new RuntimeException(); + } + }); + repo.putProtocol(protocol); + assertNull(repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null)); + } + + private static class MyFactory implements SimpleProtocol.PolicyFactory { + + @Override + public RoutingPolicy create(String param) { + return new MyPolicy(); + } + } + + private static class MyPolicy implements RoutingPolicy { + + @Override + public void select(RoutingContext context) { + + } + + @Override + public void merge(RoutingContext context) { + + } + + @Override + public void destroy() { + + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java new file mode 100644 index 00000000000..dc4beb5cf9f --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java @@ -0,0 +1,39 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.messagebus.test.SimpleMessage; +import junit.framework.TestCase; + +public class RateThrottlingTestCase extends TestCase { + + public void testPending() { + CustomTimer timer = new CustomTimer(); + RateThrottlingPolicy policy = new RateThrottlingPolicy(5.0, timer); + policy.setMaxPendingCount(200); + + // Check that we obey the max still. + assertFalse(policy.canSend(new SimpleMessage("test"), 300)); + } + + public int getActualRate(double desiredRate) { + CustomTimer timer = new CustomTimer(); + RateThrottlingPolicy policy = new RateThrottlingPolicy(desiredRate, timer); + + int ok = 0; + for (int i = 0; i < 10000; ++i) { + if (policy.canSend(new SimpleMessage("test"), 0)) { + ok++; + } + timer.millis += 10; + } + + return ok; + } + + public void testRates() { + assertEquals(10, getActualRate(0.1), 1); + assertEquals(1000, getActualRate(10), 100); + assertEquals(500, getActualRate(5), 50); + assertEquals(100, getActualRate(1), 10); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java new file mode 100755 index 00000000000..4e76ab86889 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java @@ -0,0 +1,114 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleReply; + +import java.net.UnknownHostException; + +public class RoutableTestCase extends junit.framework.TestCase { + + public void testMessageContext() throws ListenFailedException, UnknownHostException { + Slobrok slobrok = new Slobrok(); + TestServer srcServer = new TestServer("src", null, slobrok, null, null); + TestServer dstServer = new TestServer("dst", null, slobrok, null, null); + SourceSession srcSession = srcServer.mb.createSourceSession( + new Receptor(), + new SourceSessionParams().setTimeout(600.0)); + DestinationSession dstSession = dstServer.mb.createDestinationSession("session", true, new Receptor()); + + assertTrue(srcServer.waitSlobrok("dst/session", 1)); + + Object context = new Object(); + Message msg = new SimpleMessage("msg"); + msg.setContext(context); + assertTrue(srcSession.send(msg, "dst/session", true).isAccepted()); + + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + dstSession.acknowledge(msg); + + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertSame(reply.getContext(), context); + + srcSession.destroy(); + srcServer.destroy(); + dstSession.destroy(); + dstServer.destroy(); + slobrok.stop(); + } + + public void testMessageSwapState() { + Message foo = new SimpleMessage("foo"); + Route fooRoute = Route.parse("foo"); + foo.setRoute(fooRoute); + foo.setRetry(1); + foo.setTimeReceivedNow(); + foo.setTimeRemaining(2); + + Message bar = new SimpleMessage("bar"); + Route barRoute = Route.parse("bar"); + bar.setRoute(barRoute); + bar.setRetry(3); + bar.setTimeReceivedNow(); + bar.setTimeRemaining(4); + + foo.swapState(bar); + assertEquals(barRoute, foo.getRoute()); + assertEquals(fooRoute, bar.getRoute()); + assertEquals(3, foo.getRetry()); + assertEquals(1, bar.getRetry()); + assertTrue(foo.getTimeReceived() >= bar.getTimeReceived()); + assertEquals(4, foo.getTimeRemaining()); + assertEquals(2, bar.getTimeRemaining()); + } + + public void testReplySwapState() { + Reply foo = new SimpleReply("foo"); + Message fooMsg = new SimpleMessage("foo"); + foo.setMessage(fooMsg); + foo.setRetryDelay(1); + foo.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatal")); + foo.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "transient")); + + Reply bar = new SimpleReply("bar"); + Message barMsg = new SimpleMessage("bar"); + bar.setMessage(barMsg); + bar.setRetryDelay(2); + bar.addError(new Error(ErrorCode.ERROR_LIMIT, "err")); + + foo.swapState(bar); + assertEquals(barMsg, foo.getMessage()); + assertEquals(fooMsg, bar.getMessage()); + assertEquals(2.0, foo.getRetryDelay()); + assertEquals(1.0, bar.getRetryDelay()); + assertEquals(1, foo.getNumErrors()); + assertEquals(2, bar.getNumErrors()); + } + + public void testMessageDiscard() { + Receptor handler = new Receptor(); + Message msg = new SimpleMessage("foo"); + msg.pushHandler(handler); + msg.discard(); + + assertNull(handler.getReply(0)); + } + + public void testReplyDiscard() { + Receptor handler = new Receptor(); + Message msg = new SimpleMessage("foo"); + msg.pushHandler(handler); + + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + reply.discard(); + + assertNull(handler.getReply(0)); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java new file mode 100644 index 00000000000..6eb7257328b --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java @@ -0,0 +1,169 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus;
+
+import com.yahoo.component.Vtag;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import junit.framework.TestCase;
+
+import java.net.UnknownHostException;
+import java.util.logging.Handler;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SendProxyTestCase extends TestCase {
+
+ Slobrok slobrok;
+ TestServer srcServer, dstServer;
+ SourceSession srcSession;
+ DestinationSession dstSession;
+
+ @Override
+ public void setUp() throws UnknownHostException, ListenFailedException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ srcServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ public void testTraceByLogLevel() {
+ Logger log = Logger.getLogger(SendProxy.class.getName());
+ LogHandler logHandler = new LogHandler();
+ log.addHandler(logHandler);
+
+ log.setLevel(LogLevel.INFO);
+ sendMessage(0, null);
+ assertNull(logHandler.trace);
+
+ log.setLevel(LogLevel.DEBUG);
+ sendMessage(0, null);
+ assertNull(logHandler.trace);
+
+ sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertNull(logHandler.trace);
+
+ sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertEquals("Trace for reply with error(s):\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " [FATAL_ERROR @ localhost]: err\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 2) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+
+ log.setLevel(LogLevel.SPAM);
+ sendMessage(1, null);
+ assertNull(logHandler.trace);
+
+ sendMessage(0, null);
+ assertEquals("Trace for reply:\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 0) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+
+ sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertNull(logHandler.trace);
+
+ sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertEquals("Trace for reply with error(s):\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " [FATAL_ERROR @ localhost]: err\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 2) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+ }
+
+ private void sendMessage(int traceLevel, Error err) {
+ Message msg = new SimpleMessage("foo");
+ msg.getTrace().setLevel(traceLevel);
+ assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted());
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ if (err != null) {
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ reply.addError(err);
+ dstSession.reply(reply);
+ } else {
+ dstSession.acknowledge(msg);
+ }
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ }
+
+ private static class LogHandler extends Handler {
+
+ String trace = null;
+
+ @Override
+ public void publish(LogRecord record) {
+ String msg = record.getMessage();
+ if (msg.startsWith("Trace ")) {
+ msg = msg.replaceAll("\\[.*\\] ", "");
+ msg = msg.replaceAll("[0-9]+\\.[0-9]+ seconds", "x seconds");
+
+ String ver = Vtag.currentVersion.toString();
+ for (int i = msg.indexOf(ver); i >= 0; i = msg.indexOf(ver, i)) {
+ msg = msg.substring(0, i) + "${VERSION}" + msg.substring(i + ver.length());
+ }
+ trace = msg;
+ }
+ }
+
+ @Override
+ public void flush() {
+ // empty
+ }
+
+ @Override
+ public void close() throws SecurityException {
+ // empty
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java new file mode 100644 index 00000000000..bd053e5ad63 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java @@ -0,0 +1,179 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.messagebus.test.SimpleMessage; + +import java.util.LinkedList; +import java.util.Queue; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SequencerTestCase extends junit.framework.TestCase { + + public void testSyncNone() { + TestQueue src = new TestQueue(); + TestQueue dst = new TestQueue(); + QueueSender sender = new QueueSender(dst); + Sequencer seq = new Sequencer(sender); + + seq.handleMessage(src.createMessage(false, 0)); + seq.handleMessage(src.createMessage(false, 0)); + seq.handleMessage(src.createMessage(false, 0)); + seq.handleMessage(src.createMessage(false, 0)); + seq.handleMessage(src.createMessage(false, 0)); + assertEquals(0, src.size()); + assertEquals(5, dst.size()); + + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + assertEquals(5, src.size()); + assertEquals(0, dst.size()); + + src.checkReply(false, 0); + src.checkReply(false, 0); + src.checkReply(false, 0); + src.checkReply(false, 0); + src.checkReply(false, 0); + assertEquals(0, src.size()); + assertEquals(0, dst.size()); + } + + public void testSyncId() { + TestQueue src = new TestQueue(); + TestQueue dst = new TestQueue(); + QueueSender sender = new QueueSender(dst); + Sequencer seq = new Sequencer(sender); + + seq.handleMessage(src.createMessage(true, 1L)); + seq.handleMessage(src.createMessage(true, 2L)); + seq.handleMessage(src.createMessage(true, 3L)); + seq.handleMessage(src.createMessage(true, 4L)); + seq.handleMessage(src.createMessage(true, 5L)); + assertEquals(0, src.size()); + assertEquals(5, dst.size()); + + seq.handleMessage(src.createMessage(true, 1L)); + seq.handleMessage(src.createMessage(true, 5L)); + seq.handleMessage(src.createMessage(true, 2L)); + seq.handleMessage(src.createMessage(true, 10L)); + seq.handleMessage(src.createMessage(true, 4L)); + seq.handleMessage(src.createMessage(true, 3L)); + assertEquals(0, src.size()); + assertEquals(6, dst.size()); + + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + assertEquals(5, src.size()); + assertEquals(6, dst.size()); + + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + dst.replyNext(); + assertEquals(11, src.size()); + assertEquals(0, dst.size()); + + src.checkReply(true, 1); + src.checkReply(true, 2); + src.checkReply(true, 3); + src.checkReply(true, 4); + src.checkReply(true, 5); + src.checkReply(true, 10); + src.checkReply(true, 1); + src.checkReply(true, 2); + src.checkReply(true, 3); + src.checkReply(true, 4); + src.checkReply(true, 5); + assertEquals(0, src.size()); + assertEquals(0, dst.size()); + } + + @SuppressWarnings("serial") + private static class TestQueue extends LinkedList<Routable> implements ReplyHandler { + + void checkReply(boolean hasSeqId, long seqId) { + if (size() == 0) { + throw new IllegalStateException("No routable in queue."); + } + Routable obj = remove(); + assertTrue(obj instanceof Reply); + + Reply reply = (Reply)obj; + Message msg = reply.getMessage(); + assertNotNull(msg); + + assertEquals(hasSeqId, msg.hasSequenceId()); + if (hasSeqId) { + assertEquals(seqId, msg.getSequenceId()); + } + } + + public void handleReply(Reply reply) { + add(reply); + } + + void replyNext() { + Routable obj = remove(); + assertTrue(obj instanceof Message); + Message msg = (Message)obj; + + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.setMessage(msg); + ReplyHandler handler = reply.popHandler(); + handler.handleReply(reply); + } + + Message createMessage(final boolean hasSeqId, final long seqId) { + Message ret = new MyMessage(hasSeqId, seqId); + ret.pushHandler(this); + return ret; + } + } + + private static class QueueSender implements MessageHandler { + + Queue<Routable> queue; + + QueueSender(Queue<Routable> queue) { + this.queue = queue; + } + + @Override + public void handleMessage(Message msg) { + queue.offer(msg); + } + } + + private static class MyMessage extends SimpleMessage { + + final boolean hasSeqId; + final long seqId; + + MyMessage(boolean hasSeqId, long seqId) { + super("foo"); + this.hasSeqId = hasSeqId; + this.seqId = seqId; + } + + @Override + public boolean hasSequenceId() { + return hasSeqId; + } + + @Override + public long getSequenceId() { + return seqId; + } + } +} + diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java new file mode 100755 index 00000000000..2795758d922 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java @@ -0,0 +1,53 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SimpleTripTestCase extends junit.framework.TestCase {
+
+ public void testSimpleTrip() throws ListenFailedException, UnknownHostException {
+ Slobrok slobrok = new Slobrok();
+ TestServer server = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams()
+ .setIdentity(new Identity("srv"))
+ .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ DestinationSession dst = server.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ SourceSession src = server.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(server.waitSlobrok("srv/session", 1));
+
+ assertTrue(src.send(new SimpleMessage("msg"), Route.parse("srv/session")).isAccepted());
+ Message msg = ((Receptor)dst.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ assertEquals(SimpleProtocol.NAME, msg.getProtocol());
+ assertEquals(SimpleProtocol.MESSAGE, msg.getType());
+ assertEquals("msg", ((SimpleMessage)msg).getValue());
+
+ Reply reply = new SimpleReply("reply");
+ reply.swapState(msg);
+ dst.reply(reply);
+
+ assertNotNull(reply = ((Receptor)src.getReplyHandler()).getReply(60));
+ assertEquals(SimpleProtocol.NAME, reply.getProtocol());
+ assertEquals(SimpleProtocol.REPLY, reply.getType());
+ assertEquals("reply", ((SimpleReply)reply).getValue());
+
+ src.destroy();
+ dst.destroy();
+ server.destroy();
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java new file mode 100644 index 00000000000..405aa1c0b96 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java @@ -0,0 +1,230 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.RoutingTableSpec; +import com.yahoo.messagebus.test.*; + +import java.net.UnknownHostException; +import java.util.Arrays; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ThrottlerTestCase extends junit.framework.TestCase { + + //////////////////////////////////////////////////////////////////////////////// + // + // Setup + // + //////////////////////////////////////////////////////////////////////////////// + + Slobrok slobrok; + TestServer src, dst; + + public void setUp() throws ListenFailedException, UnknownHostException { + RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME); + table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session")); + table.addRoute("test", Arrays.asList("dst")); + slobrok = new Slobrok(); + src = new TestServer("test/src", table, slobrok, null, null); + dst = new TestServer("test/dst", table, slobrok, null, null); + } + + public void tearDown() { + dst.destroy(); + src.destroy(); + slobrok.stop(); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Tests + // + //////////////////////////////////////////////////////////////////////////////// + + public void testMaxCount() { + // Prepare a source session with throttle enabled. + SourceSessionParams params = new SourceSessionParams().setTimeout(600.0); + StaticThrottlePolicy policy = new StaticThrottlePolicy(); + policy.setMaxPendingCount(10); + params.setThrottlePolicy(policy); + + Receptor src_rr = new Receptor(); + SourceSession src_s = src.mb.createSourceSession(src_rr, params); + + // Prepare a destination session to acknowledge messages. + QueueAdapter dst_q = new QueueAdapter(); + DestinationSession dst_s = dst.mb.createDestinationSession("session", true, dst_q); + src.waitSlobrok("test/dst/session", 1); + + // Send until throttler rejects a message. + for (int i = 0; i < policy.getMaxPendingCount(); i++) { + assertTrue(src_s.send(new SimpleMessage("msg"), "test").isAccepted()); + } + assertFalse(src_s.send(new SimpleMessage("msg"), "test").isAccepted()); + + // Acknowledge one message at a time, then attempt to send two more. + for (int i = 0; i < 10; i++) { + assertTrue(dst_q.waitSize(policy.getMaxPendingCount(), 60)); + dst_s.acknowledge((Message)dst_q.dequeue()); + + assertNotNull(src_rr.getReply(60)); + assertTrue(src_s.send(new SimpleMessage("msg"), "test").isAccepted()); + assertFalse(src_s.send(new SimpleMessage("msg"), "test").isAccepted()); + } + + assertTrue(dst_q.waitSize(policy.getMaxPendingCount(), 60)); + while (!dst_q.isEmpty()) { + dst_s.acknowledge((Message)dst_q.dequeue()); + } + + src_s.close(); + dst_s.destroy(); + } + + public void testMaxSize() { + // Prepare a source session with throttle enabled. + SourceSessionParams params = new SourceSessionParams().setTimeout(600.0); + StaticThrottlePolicy policy = new StaticThrottlePolicy(); + policy.setMaxPendingCount(1000); + policy.setMaxPendingSize(2); + params.setThrottlePolicy(policy); + + Receptor src_rr = new Receptor(); + SourceSession src_s = src.mb.createSourceSession(src_rr, params); + + // Prepare a destination session to acknowledge messages. + QueueAdapter dst_q = new QueueAdapter(); + DestinationSession dst_s = dst.mb.createDestinationSession("session", true, dst_q); + src.waitSlobrok("test/dst/session", 1); + + assertTrue(src_s.send(new SimpleMessage("1"), "test").isAccepted()); + assertTrue(dst_q.waitSize(1, 60)); + assertTrue(src_s.send(new SimpleMessage("12"), "test").isAccepted()); + assertTrue(dst_q.waitSize(2, 60)); + + assertFalse(src_s.send(new SimpleMessage("1"), "test").isAccepted()); + dst_s.acknowledge((Message)dst_q.dequeue()); + assertNotNull(src_rr.getReply(60)); + + assertFalse(src_s.send(new SimpleMessage("1"), "test").isAccepted()); + dst_s.acknowledge((Message)dst_q.dequeue()); + assertNotNull(src_rr.getReply(60)); + + assertTrue(src_s.send(new SimpleMessage("12"), "test").isAccepted()); + assertTrue(dst_q.waitSize(1, 60)); + assertFalse(src_s.send(new SimpleMessage("1"), "test").isAccepted()); + dst_s.acknowledge((Message)dst_q.dequeue()); + assertNotNull(src_rr.getReply(60)); + + // Close sessions. + src_s.close(); + dst_s.destroy(); + } + + public void testDynamicWindowSize() { + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer); + + policy.setWindowSizeIncrement(5); + policy.setResizeRate(1); + + double windowSize = getWindowSize(policy, timer, 100); + assertTrue(windowSize >= 90 && windowSize <= 110); + + windowSize = getWindowSize(policy, timer, 200); + assertTrue(windowSize >= 90 && windowSize <= 210); + + windowSize = getWindowSize(policy, timer, 50); + assertTrue(windowSize >= 9 && windowSize <= 55); + + windowSize = getWindowSize(policy, timer, 500); + assertTrue(windowSize >= 90 && windowSize <= 505); + + windowSize = getWindowSize(policy, timer, 100); + assertTrue(windowSize >= 90 && windowSize <= 115); + } + + public void testIdleTimePeriod() { + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer); + + policy.setWindowSizeIncrement(5); + policy.setResizeRate(1); + + double windowSize = getWindowSize(policy, timer, 100); + assertTrue(windowSize >= 90 && windowSize <= 110); + + Message msg = new SimpleMessage("foo"); + timer.millis += 30 * 1000; + assertTrue(policy.canSend(msg, 0)); + assertTrue(windowSize >= 90 && windowSize <= 110); + + timer.millis += 60 * 1000 + 1; + assertTrue(policy.canSend(msg, 50)); + assertEquals(55, policy.getMaxPendingCount()); + + timer.millis += 60 * 1000 + 1; + assertTrue(policy.canSend(msg, 0)); + assertEquals(5, policy.getMaxPendingCount()); + + } + + public void testMinWindowSize() { + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer); + + policy.setWindowSizeIncrement(5); + policy.setResizeRate(1); + policy.setMinWindowSize(150); + + double windowSize = getWindowSize(policy, timer, 200); + assertTrue(windowSize >= 150 && windowSize <= 210); + } + + public void testMaxWindowSize() { + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer); + + policy.setWindowSizeIncrement(5); + policy.setResizeRate(1); + policy.setMaxWindowSize(50); + + double windowSize = getWindowSize(policy, timer, 100); + assertTrue(windowSize >= 40 && windowSize <= 50); + } + + + //////////////////////////////////////////////////////////////////////////////// + // + // Utilities + // + //////////////////////////////////////////////////////////////////////////////// + + private int getWindowSize(DynamicThrottlePolicy policy, CustomTimer timer, int maxPending) { + Message msg = new SimpleMessage("foo"); + Reply reply = new SimpleReply("bar"); + reply.setContext(1); + for (int i = 0; i < 999; ++i) { + int numPending = 0; + while (policy.canSend(msg, numPending)) { + policy.processMessage(msg); + ++numPending; + } + + long tripTime = (numPending < maxPending) ? 1000 : 1000 + (numPending - maxPending) * 1000; + timer.millis += tripTime; + + while (--numPending >= 0) { + policy.processReply(reply); + } + } + int ret = policy.getMaxPendingCount(); + System.out.println("getWindowSize() = " + ret); + return ret; + } + +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/TimeoutTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/TimeoutTestCase.java new file mode 100755 index 00000000000..05a14bf8d16 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/TimeoutTestCase.java @@ -0,0 +1,102 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.UnknownHostException; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class TimeoutTestCase { + + private final Slobrok slobrok; + private final TestServer srcServer, dstServer; + private final SourceSession srcSession; + private final DestinationSession dstSession; + + public TimeoutTestCase() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setIdentity(new Identity("dst")) + .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams() + .setName("session") + .setMessageHandler(new Receptor())); + srcServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + srcSession = srcServer.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor())); + } + + @Before + public void waitForSlobrokRegistration() { + assertTrue(srcServer.waitSlobrok("dst/session", 1)); + } + + @After + public void destroyResources() { + slobrok.stop(); + dstSession.destroy(); + dstServer.destroy(); + srcSession.destroy(); + srcServer.destroy(); + + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(0); + if (msg != null) { + msg.discard(); + } + } + + @Test + public void requireThatMessageCanTimeout() throws ListenFailedException, UnknownHostException { + srcSession.setTimeout(1); + assertSend(srcSession, newMessage(), "dst/session"); + assertTimeout(((Receptor)srcSession.getReplyHandler()).getReply(60)); + } + + @Test + public void requireThatZeroTimeoutMeansImmediateTimeout() throws ListenFailedException, UnknownHostException { + srcSession.setTimeout(0); + assertSend(srcSession, newMessage(), "dst/session"); + assertTimeout(((Receptor)srcSession.getReplyHandler()).getReply(60)); + } + + private static void assertSend(SourceSession session, Message msg, String route) { + assertTrue(session.send(msg, Route.parse(route)).isAccepted()); + } + + private static void assertTimeout(Reply reply) { + assertNotNull(reply); + assertTrue(reply.getTrace().toString(), hasError(reply, ErrorCode.TIMEOUT)); + } + + private static Message newMessage() { + Message msg = new SimpleMessage("msg"); + msg.getTrace().setLevel(9); + return msg; + } + + private static boolean hasError(Reply reply, int errorCode) { + for (int i = 0; i < reply.getNumErrors(); ++i) { + if (reply.getError(i).getCode() == errorCode) { + return true; + } + } + return false; + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/TraceTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/TraceTestCase.java new file mode 100755 index 00000000000..c6b67087b7f --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/TraceTestCase.java @@ -0,0 +1,286 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class TraceTestCase extends junit.framework.TestCase { + + public void testEncodeDecode() { + assertEquals("()", TraceNode.decode("").encode()); + assertEquals("()", TraceNode.decode("[xyz").encode()); + assertEquals("([xyz][])", TraceNode.decode("[xyz][]").encode()); + assertEquals("[xyz]", TraceNode.decode("[xyz]").encode()); + assertEquals("()", TraceNode.decode("{()").encode()); + assertEquals("({()}{})", TraceNode.decode("{()}{}").encode()); + assertEquals("{()}", TraceNode.decode("{()}").encode()); + assertEquals("()", TraceNode.decode("({}").encode()); + assertEquals("(({})())", TraceNode.decode("({})()").encode()); + assertEquals("([])", TraceNode.decode("([])").encode()); + + assertTrue(TraceNode.decode("").isEmpty()); + assertTrue(!TraceNode.decode("([note])").isEmpty()); + + String str = + "([[17/Jun/2009:09:02:30 +0200\\] Message (type 1) received at 'dst' for session 'session'.]" + + "[[17/Jun/2009:09:02:30 +0200\\] [APP_TRANSIENT_ERROR @ localhost\\]: err1]" + + "[[17/Jun/2009:09:02:30 +0200\\] Sending reply (version 4.2) from 'dst'.])"; + System.out.println(TraceNode.decode(str).toString()); + assertEquals(str, TraceNode.decode(str).encode()); + + str = "([Note 0][Note 1]{[Note 2]}{([Note 3])({[Note 4]})})"; + TraceNode t = TraceNode.decode(str); + assertEquals(str, t.encode()); + + assertTrue(t.isRoot()); + assertTrue(t.isStrict()); + assertTrue(!t.isLeaf()); + assertEquals(4, t.getNumChildren()); + + { + TraceNode c = t.getChild(0); + assertTrue(c.isLeaf()); + assertEquals("Note 0", c.getNote()); + } + { + TraceNode c = t.getChild(1); + assertTrue(c.isLeaf()); + assertEquals("Note 1", c.getNote()); + } + { + TraceNode c = t.getChild(2); + assertTrue(!c.isLeaf()); + assertTrue(!c.isStrict()); + assertEquals(1, c.getNumChildren()); + { + TraceNode d = c.getChild(0); + assertTrue(d.isLeaf()); + assertEquals("Note 2", d.getNote()); + } + } + { + TraceNode c = t.getChild(3); + assertTrue(!c.isStrict()); + assertEquals(2, c.getNumChildren()); + { + TraceNode d = c.getChild(0); + assertTrue(d.isStrict()); + assertTrue(!d.isLeaf()); + assertEquals(1, d.getNumChildren()); + { + TraceNode e = d.getChild(0); + assertTrue(e.isLeaf()); + assertEquals("Note 3", e.getNote()); + } + } + { + TraceNode d = c.getChild(1); + assertTrue(d.isStrict()); + assertEquals(1, d.getNumChildren()); + { + TraceNode e = d.getChild(0); + assertTrue(!e.isStrict()); + assertEquals(1, e.getNumChildren()); + { + TraceNode f = e.getChild(0); + assertTrue(f.isLeaf()); + assertEquals("Note 4", f.getNote()); + } + } + } + } + } + + public void testReservedChars() { + TraceNode t = new TraceNode(); + t.addChild("abc(){}[]\\xyz"); + assertEquals("abc(){}[]\\xyz", t.getChild(0).getNote()); + assertEquals("([abc(){}[\\]\\\\xyz])", t.encode()); + { + // test swap/clear/empty here + TraceNode t2 = new TraceNode(); + assertTrue(t2.isEmpty()); + t2.swap(t); + assertTrue(!t2.isEmpty()); + assertEquals("abc(){}[]\\xyz", t2.getChild(0).getNote()); + assertEquals("([abc(){}[\\]\\\\xyz])", t2.encode()); + t2.clear(); + assertTrue(t2.isEmpty()); + } + } + + public void testAdd() { + TraceNode t1 = TraceNode.decode("([x])"); + TraceNode t2 = TraceNode.decode("([y])"); + TraceNode t3 = TraceNode.decode("([z])"); + + t1.addChild(t2); + assertEquals("([x]([y]))", t1.encode()); + assertTrue(t1.getChild(1).isStrict()); + t1.addChild("txt"); + assertTrue(t1.getChild(2).isLeaf()); + assertEquals("([x]([y])[txt])", t1.encode()); + t3.addChild(t1); + assertEquals("([z]([x]([y])[txt]))", t3.encode()); + + // crazy but possible (everything is by value) + t2.addChild(t2).addChild(t2); + assertEquals("([y]([y])([y]([y])))", t2.encode()); + } + + public void testStrict() { + assertEquals("{}", TraceNode.decode("()").setStrict(false).encode()); + assertEquals("{[x]}", TraceNode.decode("([x])").setStrict(false).encode()); + assertEquals("{[x][y]}", TraceNode.decode("([x][y])").setStrict(false).encode()); + } + + public void testTraceLevel() { + Trace t = new Trace(); + t.setLevel(4); + assertEquals(4, t.getLevel()); + t.trace(9, "no"); + assertEquals(0, t.getRoot().getNumChildren()); + t.trace(8, "no"); + assertEquals(0, t.getRoot().getNumChildren()); + t.trace(7, "no"); + assertEquals(0, t.getRoot().getNumChildren()); + t.trace(6, "no"); + assertEquals(0, t.getRoot().getNumChildren()); + t.trace(5, "no"); + assertEquals(0, t.getRoot().getNumChildren()); + t.trace(4, "yes"); + assertEquals(1, t.getRoot().getNumChildren()); + t.trace(3, "yes"); + assertEquals(2, t.getRoot().getNumChildren()); + t.trace(2, "yes"); + assertEquals(3, t.getRoot().getNumChildren()); + t.trace(1, "yes"); + assertEquals(4, t.getRoot().getNumChildren()); + t.trace(0, "yes"); + assertEquals(5, t.getRoot().getNumChildren()); + } + + public void testCompact() { + assertEquals("()", TraceNode.decode("()").compact().encode()); + assertEquals("()", TraceNode.decode("(())").compact().encode()); + assertEquals("()", TraceNode.decode("(()())").compact().encode()); + assertEquals("()", TraceNode.decode("({})").compact().encode()); + assertEquals("()", TraceNode.decode("({}{})").compact().encode()); + assertEquals("()", TraceNode.decode("({{}{}})").compact().encode()); + + assertEquals("([x])", TraceNode.decode("([x])").compact().encode()); + assertEquals("([x])", TraceNode.decode("(([x]))").compact().encode()); + assertEquals("([x][y])", TraceNode.decode("(([x])([y]))").compact().encode()); + assertEquals("([x])", TraceNode.decode("({[x]})").compact().encode()); + assertEquals("([x][y])", TraceNode.decode("({[x]}{[y]})").compact().encode()); + assertEquals("({[x][y]})", TraceNode.decode("({{[x]}{[y]}})").compact().encode()); + + assertEquals("([a][b][c][d])", TraceNode.decode("(([a][b])([c][d]))").compact().encode()); + assertEquals("({[a][b]}{[c][d]})", TraceNode.decode("({[a][b]}{[c][d]})").compact().encode()); + assertEquals("({[a][b][c][d]})", TraceNode.decode("({{[a][b]}{[c][d]}})").compact().encode()); + assertEquals("({([a][b])([c][d])})", TraceNode.decode("({([a][b])([c][d])})").compact().encode()); + + assertEquals("({{}{(({()}({}){()(){}}){})}})", TraceNode.decode("({{}{(({()}({}){()(){}}){})}})").encode()); + assertEquals("()", TraceNode.decode("({{}{(({()}({}){()(){}}){})}})").compact().encode()); + assertEquals("([x])", TraceNode.decode("({{}{([x]({()}({}){()(){}}){})}})").compact().encode()); + assertEquals("([x])", TraceNode.decode("({{}{(({()}({[x]}){()(){}}){})}})").compact().encode()); + assertEquals("([x])", TraceNode.decode("({{}{(({()}({}){()(){}})[x]{})}})").compact().encode()); + + assertEquals("({[a][b][c][d][e][f]})", TraceNode.decode("({({[a][b]})({[c][d]})({[e][f]})})").compact().encode()); + } + + public void testSort() { + assertEquals("([b][a][c])", TraceNode.decode("([b][a][c])").sort().encode()); + assertEquals("({[a][b][c]})", TraceNode.decode("({[b][a][c]})").sort().encode()); + assertEquals("(([c][a])([b]))", TraceNode.decode("(([c][a])([b]))").sort().encode()); + assertEquals("({[b]([c][a])})", TraceNode.decode("({([c][a])[b]})").sort().encode()); + assertEquals("({[a][c]}[b])", TraceNode.decode("({[c][a]}[b])").sort().encode()); + assertEquals("({([b]){[a][c]}})", TraceNode.decode("({{[c][a]}([b])})").sort().encode()); + } + + public void testNormalize() { + TraceNode t1 = TraceNode.decode("({([a][b]{[x][y]([p][q])})([c][d])([e][f])})"); + TraceNode t2 = TraceNode.decode("({([a][b]{[y][x]([p][q])})([c][d])([e][f])})"); + TraceNode t3 = TraceNode.decode("({([a][b]{[y]([p][q])[x]})([c][d])([e][f])})"); + TraceNode t4 = TraceNode.decode("({([e][f])([a][b]{[y]([p][q])[x]})([c][d])})"); + TraceNode t5 = TraceNode.decode("({([e][f])([c][d])([a][b]{([p][q])[y][x]})})"); + + TraceNode tx = TraceNode.decode("({([b][a]{[x][y]([p][q])})([c][d])([e][f])})"); + TraceNode ty = TraceNode.decode("({([a][b]{[x][y]([p][q])})([d][c])([e][f])})"); + TraceNode tz = TraceNode.decode("({([a][b]{[x][y]([q][p])})([c][d])([e][f])})"); + + assertEquals("({([a][b]{[x][y]([p][q])})([c][d])([e][f])})", t1.compact().encode()); + + assertTrue(!t1.compact().encode().equals(t2.compact().encode())); + assertTrue(!t1.compact().encode().equals(t3.compact().encode())); + assertTrue(!t1.compact().encode().equals(t4.compact().encode())); + assertTrue(!t1.compact().encode().equals(t5.compact().encode())); + assertTrue(!t1.compact().encode().equals(tx.compact().encode())); + assertTrue(!t1.compact().encode().equals(ty.compact().encode())); + assertTrue(!t1.compact().encode().equals(tz.compact().encode())); + + System.out.println("1: " + t1.normalize().encode()); + System.out.println("2: " + t2.normalize().encode()); + System.out.println("3: " + t3.normalize().encode()); + System.out.println("4: " + t4.normalize().encode()); + System.out.println("5: " + t5.normalize().encode()); + System.out.println("x: " + tx.normalize().encode()); + System.out.println("y: " + ty.normalize().encode()); + System.out.println("z: " + tz.normalize().encode()); + assertTrue(t1.normalize().encode().equals(t2.normalize().encode())); + assertTrue(t1.normalize().encode().equals(t3.normalize().encode())); + assertTrue(t1.normalize().encode().equals(t4.normalize().encode())); + assertTrue(t1.normalize().encode().equals(t5.normalize().encode())); + assertTrue(!t1.normalize().encode().equals(tx.normalize().encode())); + assertTrue(!t1.normalize().encode().equals(ty.normalize().encode())); + assertTrue(!t1.normalize().encode().equals(tz.normalize().encode())); + + assertEquals("({([c][d])([e][f])([a][b]{[x][y]([p][q])})})", t1.normalize().encode()); + } + + public void testTraceDump() { + { + Trace big = new Trace(); + TraceNode b1 = new TraceNode(); + TraceNode b2 = new TraceNode(); + for (int i = 0; i < 100; ++i) { + b2.addChild("test"); + } + for (int i = 0; i < 10; ++i) { + b1.addChild(b2); + } + for (int i = 0; i < 10; ++i) { + big.getRoot().addChild(b1); + } + String normal = big.toString(); + String full = big.getRoot().toString(); + assertTrue(normal.length() > 30000); + assertTrue(normal.length() < 32000); + assertTrue(full.length() > 50000); + assertEquals(normal.substring(0, 30000), full.substring(0, 30000)); + } + { + TraceNode s1 = new TraceNode(); + TraceNode s2 = new TraceNode(); + s2.addChild("test"); + s2.addChild("test"); + s1.addChild(s2); + s1.addChild(s2); + assertEquals("...\n", s1.toString(0)); + assertEquals("<trace>\n...\n", s1.toString(1)); + assertEquals("<trace>\n" + // 8 8 + " <trace>\n" + // 12 20 + " test\n" + // 13 33 + "...\n", s1.toString(33)); + assertEquals("<trace>\n" + // 8 8 + " test\n" + // 9 17 + " test\n" + // 9 26 + "...\n", s2.toString(26)); + assertEquals("<trace>\n" + // 8 8 + " test\n" + // 9 17 + " test\n" + // 9 26 + "</trace>\n", s2.toString(27)); + assertEquals(s2.toString(27), s2.toString()); + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java new file mode 100755 index 00000000000..e675a8ef98a --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java @@ -0,0 +1,116 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.RoutingTableSpec; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.net.UnknownHostException; +import java.util.Arrays; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class TraceTripTestCase extends junit.framework.TestCase { + + Slobrok slobrok; + TestServer src; + TestServer pxy; + TestServer dst; + + public TraceTripTestCase(String message) { + super(message); + } + + public void setUp() throws ListenFailedException, UnknownHostException { + RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME) + .addHop("pxy", "test/pxy/session", Arrays.asList("test/pxy/session")) + .addHop("dst", "test/dst/session", Arrays.asList("test/dst/session")) + .addRoute("test", Arrays.asList("pxy", "dst")); + + slobrok = new Slobrok(); + src = new TestServer("test/src", table, slobrok, null, null); + pxy = new TestServer("test/pxy", table, slobrok, null, null); + dst = new TestServer("test/dst", table, slobrok, null, null); + } + + public void tearDown() { + dst.destroy(); + pxy.destroy(); + src.destroy(); + slobrok.stop(); + } + + public void testTrip() { + Receptor src_rr = new Receptor(); + SourceSession src_s = src.mb.createSourceSession(src_rr); + + new Proxy(pxy.mb); + assertTrue(src.waitSlobrok("test/pxy/session", 1)); + + new Server(dst.mb); + assertTrue(src.waitSlobrok("test/dst/session", 1)); + assertTrue(pxy.waitSlobrok("test/dst/session", 1)); + + Message msg = new SimpleMessage(""); + msg.getTrace().setLevel(1); + msg.getTrace().trace(1, "Client message", false); + src_s.send(msg, "test"); + Reply reply = src_rr.getReply(60); + reply.getTrace().trace(1, "Client reply", false); + assertTrue(reply.getNumErrors() == 0); + + TraceNode t = new TraceNode() + .addChild("Client message") + .addChild("Proxy message") + .addChild("Server message") + .addChild("Server reply") + .addChild("Proxy reply") + .addChild("Client reply"); + System.out.println("reply: " + reply.getTrace().getRoot().encode()); + System.out.println("want : " + t.encode()); + assertTrue(reply.getTrace().getRoot().encode().equals(t.encode())); + } + + private static class Proxy implements MessageHandler, ReplyHandler { + private IntermediateSession session; + + public Proxy(MessageBus bus) { + session = bus.createIntermediateSession("session", true, this, this); + } + + public void handleMessage(Message msg) { + msg.getTrace().trace(1, "Proxy message", false); + System.out.println(msg.getTrace().getRoot().encode()); + session.forward(msg); + } + + public void handleReply(Reply reply) { + reply.getTrace().trace(1, "Proxy reply", false); + System.out.println(reply.getTrace().getRoot().encode()); + session.forward(reply); + } + } + + private static class Server implements MessageHandler { + private DestinationSession session; + + public Server(MessageBus bus) { + session = bus.createDestinationSession("session", true, this); + } + + public void handleMessage(Message msg) { + msg.getTrace().trace(1, "Server message", false); + System.out.println(msg.getTrace().getRoot().encode()); + Reply reply = new EmptyReply(); + msg.swapState(reply); + reply.getTrace().trace(1, "Server reply", false); + System.out.println(reply.getTrace().getRoot().encode()); + session.reply(reply); + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/IdentityTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/IdentityTestCase.java new file mode 100644 index 00000000000..2191b2750bf --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/IdentityTestCase.java @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class IdentityTestCase { + + @Test + public void requireThatAccessorsWork() { + Identity id = new Identity("foo"); + assertNotNull(id.getHostname()); + assertEquals("foo", id.getServicePrefix()); + } + + @Test + public void requireThatCopyConstructorWorks() { + Identity lhs = new Identity("foo"); + Identity rhs = new Identity(lhs); + assertEquals(lhs.getHostname(), rhs.getHostname()); + assertEquals(lhs.getServicePrefix(), rhs.getServicePrefix()); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java new file mode 100644 index 00000000000..296e724a502 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java @@ -0,0 +1,132 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.local; + +import com.yahoo.messagebus.DestinationSession; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.IntermediateSession; +import com.yahoo.messagebus.IntermediateSessionParams; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.routing.Hop; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class LocalNetworkTest { + + @Test + public void requireThatLocalNetworkCanSendAndReceive() throws InterruptedException { + final LocalWire wire = new LocalWire(); + + final Server serverA = new Server(wire); + final SourceSession source = serverA.newSourceSession(); + + final Server serverB = new Server(wire); + final IntermediateSession intermediate = serverB.newIntermediateSession(); + + final Server serverC = new Server(wire); + final DestinationSession destination = serverC.newDestinationSession(); + + Message msg = new SimpleMessage("foo"); + msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec())) + .addHop(Hop.parse(destination.getConnectionSpec()))); + assertThat(source.send(msg).isAccepted(), is(true)); + + msg = serverB.messages.poll(60, TimeUnit.SECONDS); + assertThat(msg, instanceOf(SimpleMessage.class)); + assertThat(((SimpleMessage)msg).getValue(), is("foo")); + intermediate.forward(msg); + + msg = serverC.messages.poll(60, TimeUnit.SECONDS); + assertThat(msg, instanceOf(SimpleMessage.class)); + assertThat(((SimpleMessage)msg).getValue(), is("foo")); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + destination.reply(reply); + + reply = serverB.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(SimpleReply.class)); + assertThat(((SimpleReply)reply).getValue(), is("bar")); + intermediate.forward(reply); + + reply = serverA.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(SimpleReply.class)); + assertThat(((SimpleReply)reply).getValue(), is("bar")); + + serverA.mbus.destroy(); + serverB.mbus.destroy(); + serverC.mbus.destroy(); + } + + @Test + public void requireThatUnknownServiceRepliesWithNoAddressForService() throws InterruptedException { + final Server server = new Server(new LocalWire()); + final SourceSession source = server.newSourceSession(); + + final Message msg = new SimpleMessage("foo").setRoute(Route.parse("bar")); + assertThat(source.send(msg).isAccepted(), is(true)); + final Reply reply = server.replies.poll(60, TimeUnit.SECONDS); + assertThat(reply, instanceOf(EmptyReply.class)); + + server.mbus.destroy(); + } + + private static class Server implements MessageHandler, ReplyHandler { + + final MessageBus mbus; + final BlockingDeque<Message> messages = new LinkedBlockingDeque<>(); + final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>(); + + Server(final LocalWire wire) { + mbus = new MessageBus(new LocalNetwork(wire), + new MessageBusParams().addProtocol(new SimpleProtocol()) + .setRetryPolicy(null)); + } + + SourceSession newSourceSession() { + return mbus.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(this)); + } + + IntermediateSession newIntermediateSession() { + return mbus.createIntermediateSession(new IntermediateSessionParams() + .setMessageHandler(this) + .setReplyHandler(this)); + } + + DestinationSession newDestinationSession() { + return mbus.createDestinationSession(new DestinationSessionParams() + .setMessageHandler(this)); + } + + @Override + public void handleMessage(final Message msg) { + messages.addLast(msg); + } + + @Override + public void handleReply(final Reply reply) { + replies.addLast(reply); + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java new file mode 100644 index 00000000000..7fe481a2196 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java @@ -0,0 +1,152 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc; + + +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingTableSpec; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import com.yahoo.messagebus.test.SimpleReply; + +import java.net.UnknownHostException; +import java.util.Arrays; + + +/** + * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a> + */ +public class BasicNetworkTestCase extends junit.framework.TestCase { + + Slobrok slobrok; + TestServer src; + TestServer pxy; + TestServer dst; + + public void setUp() throws ListenFailedException, UnknownHostException { + RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME); + table.addHop("pxy", "test/pxy/session", Arrays.asList("test/pxy/session")); + table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session")); + table.addRoute("test", Arrays.asList("pxy", "dst")); + slobrok = new Slobrok(); + src = new TestServer("test/src", table, slobrok, null, null); + pxy = new TestServer("test/pxy", table, slobrok, null, null); + dst = new TestServer("test/dst", table, slobrok, null, null); + } + + public void tearDown() { + dst.destroy(); + pxy.destroy(); + src.destroy(); + slobrok.stop(); + } + + public void testNetwork() { + // set up receptors + Receptor src_rr = new Receptor(); + Receptor pxy_mr = new Receptor(); + Receptor pxy_rr = new Receptor(); + Receptor dst_mr = new Receptor(); + + // set up sessions + SourceSessionParams sp = new SourceSessionParams(); + sp.setTimeout(30.0); + + SourceSession ss = src.mb.createSourceSession(src_rr, sp); + IntermediateSession is = pxy.mb.createIntermediateSession("session", true, pxy_mr, pxy_rr); + DestinationSession ds = dst.mb.createDestinationSession("session", true, dst_mr); + + // wait for slobrok registration + assertTrue(src.waitSlobrok("test/pxy/session", 1)); + assertTrue(src.waitSlobrok("test/dst/session", 1)); + assertTrue(pxy.waitSlobrok("test/dst/session", 1)); + + // send message on client + ss.send(new SimpleMessage("test message"), "test"); + + // check message on proxy + Message msg = pxy_mr.getMessage(60); + assertTrue(msg != null); + assertEquals(SimpleProtocol.MESSAGE, msg.getType()); + SimpleMessage sm = (SimpleMessage) msg; + assertEquals("test message", sm.getValue()); + + // forward message on proxy + sm.setValue(sm.getValue() + " pxy"); + is.forward(sm); + + // check message on server + msg = dst_mr.getMessage(60); + assertTrue(msg != null); + assertEquals(SimpleProtocol.MESSAGE, msg.getType()); + sm = (SimpleMessage) msg; + assertEquals("test message pxy", sm.getValue()); + + // send reply on server + SimpleReply sr = new SimpleReply("test reply"); + sm.swapState(sr); + ds.reply(sr); + + // check reply on proxy + Reply reply = pxy_rr.getReply(60); + assertTrue(reply != null); + assertEquals(SimpleProtocol.REPLY, reply.getType()); + sr = (SimpleReply) reply; + assertEquals("test reply", sr.getValue()); + + // forward reply on proxy + sr.setValue(sr.getValue() + " pxy"); + is.forward(sr); + + // check reply on client + reply = src_rr.getReply(60); + assertTrue(reply != null); + assertEquals(SimpleProtocol.REPLY, reply.getType()); + sr = (SimpleReply) reply; + assertEquals("test reply pxy", sr.getValue()); + + ss.destroy(); + is.destroy(); + ds.destroy(); + } + + public void testTimeoutsFollowMessage() { + SourceSessionParams params = new SourceSessionParams().setTimeout(600.0); + SourceSession ss = src.mb.createSourceSession(new Receptor(), params); + DestinationSession ds = dst.mb.createDestinationSession("session", true, new Receptor()); + assertTrue(src.waitSlobrok("test/dst/session", 1)); + + // Test default timeouts being set. + Message msg = new SimpleMessage("msg"); + msg.getTrace().setLevel(9); + long now = SystemTimer.INSTANCE.milliTime(); + assertTrue(ss.send(msg, Route.parse("dst")).isAccepted()); + + assertNotNull(msg = ((Receptor)ds.getMessageHandler()).getMessage(60)); + assertTrue(msg.getTimeReceived() >= now); + assertTrue(params.getTimeout() * 1000 >= msg.getTimeRemaining()); + ds.acknowledge(msg); + + assertNotNull(((Receptor)ss.getReplyHandler()).getReply(60)); + + // Test default timeouts being overwritten. + msg = new SimpleMessage("msg"); + msg.getTrace().setLevel(9); + msg.setTimeRemaining(2 * (long)(params.getTimeout() * 1000)); + assertTrue(ss.send(msg, Route.parse("dst")).isAccepted()); + + assertNotNull(msg = ((Receptor)ds.getMessageHandler()).getMessage(60)); + assertTrue(params.getTimeout() * 1000 < msg.getTimeRemaining()); + ds.acknowledge(msg); + + assertNotNull(((Receptor)ss.getReplyHandler()).getReply(60)); + + ss.destroy(); + ds.destroy(); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java new file mode 100644 index 00000000000..3f6c449679e --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java @@ -0,0 +1,96 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.QueueAdapter; +import com.yahoo.messagebus.test.SimpleMessage; + +import java.net.UnknownHostException; + +/** + * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a> + */ +public class LoadBalanceTestCase extends junit.framework.TestCase { + + public void testLoadBalance() throws ListenFailedException, UnknownHostException { + Slobrok slobrok = new Slobrok(); + TestServer src = new TestServer("src", null, slobrok, null, null); + TestServer dst1 = new TestServer("dst/1", null, slobrok, null, null); + TestServer dst2 = new TestServer("dst/2", null, slobrok, null, null); + TestServer dst3 = new TestServer("dst/3", null, slobrok, null, null); + + // set up handlers + final QueueAdapter sq = new QueueAdapter(); + SourceSession ss = src.mb.createSourceSession(new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null) + .setReplyHandler(new ReplyHandler() { + @Override + public void handleReply(Reply reply) { + System.out.println(Thread.currentThread().getName() + ": Reply '" + + ((SimpleMessage)reply.getMessage()).getValue() + "' received at source."); + sq.handleReply(reply); + } + })); + SimpleDestination h1 = new SimpleDestination(dst1.mb, dst1.net.getIdentity()); + SimpleDestination h2 = new SimpleDestination(dst2.mb, dst2.net.getIdentity()); + SimpleDestination h3 = new SimpleDestination(dst3.mb, dst3.net.getIdentity()); + assertTrue(src.waitSlobrok("dst/*/session", 3)); + + // send messages + int msgCnt = 30; // should be divisible by 3 + for (int i = 0; i < msgCnt; ++i) { + ss.send(new SimpleMessage("msg" + i), Route.parse("dst/*/session")); + } + + // wait for replies + assertTrue(sq.waitSize(msgCnt, 60)); + + // check handler message distribution + assertEquals(msgCnt / 3, h1.getCount()); + assertEquals(msgCnt / 3, h2.getCount()); + assertEquals(msgCnt / 3, h3.getCount()); + + ss.destroy(); + h1.session.destroy(); + h2.session.destroy(); + h3.session.destroy(); + + dst3.destroy(); + dst2.destroy(); + dst1.destroy(); + src.destroy(); + slobrok.stop(); + } + + /** + * Implements a simple destination that counts and acknowledges all messages received. + */ + private static class SimpleDestination implements MessageHandler { + + final DestinationSession session; + final String ident; + int cnt = 0; + + SimpleDestination(MessageBus mb, Identity ident) { + this.session = mb.createDestinationSession("session", true, this); + this.ident = ident.getServicePrefix(); + } + + @Override + public synchronized void handleMessage(Message msg) { + System.out.println( + Thread.currentThread().getName() + ": " + + "Message '" + ((SimpleMessage)msg).getValue() + "' received at '" + ident + "'."); + session.acknowledge(msg); + ++cnt; + } + + public synchronized int getCount() { + return cnt; + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java new file mode 100755 index 00000000000..e65621e6a10 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java @@ -0,0 +1,200 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.test.OOSServer; +import com.yahoo.messagebus.network.rpc.test.OOSState; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.net.UnknownHostException; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class OOSTestCase extends junit.framework.TestCase { + + private static class MyServer extends TestServer implements MessageHandler { + DestinationSession session; + + public MyServer(String name, Slobrok slobrok, String oosServerPattern) + throws ListenFailedException, UnknownHostException + { + super(new MessageBusParams().setRetryPolicy(null).addProtocol(new SimpleProtocol()), + new RPCNetworkParams() + .setIdentity(new Identity(name)) + .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)) + .setOOSServerPattern(oosServerPattern)); + session = mb.createDestinationSession("session", true, this); + } + + public boolean destroy() { + session.destroy(); + return super.destroy(); + } + + public void handleMessage(Message msg) { + session.acknowledge(msg); + } + } + + private static void assertError(SourceSession src, String dst, int error) { + Message msg = new SimpleMessage("msg"); + msg.getTrace().setLevel(9); + assertTrue(src.send(msg, Route.parse(dst)).isAccepted()); + Reply reply = ((Receptor) src.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + if (error == ErrorCode.NONE) { + assertFalse(reply.hasErrors()); + } else { + assertTrue(reply.hasErrors()); + assertEquals(error, reply.getError(0).getCode()); + } + } + + public void testOOS() throws ListenFailedException, UnknownHostException { + Slobrok slobrok = new Slobrok(); + TestServer srcServer = new TestServer("src", null, slobrok, "oos/*", null); + SourceSession srcSession = srcServer.mb.createSourceSession(new Receptor()); + + MyServer dst1 = new MyServer("dst1", slobrok, null); + MyServer dst2 = new MyServer("dst2", slobrok, null); + MyServer dst3 = new MyServer("dst3", slobrok, null); + MyServer dst4 = new MyServer("dst4", slobrok, null); + MyServer dst5 = new MyServer("dst5", slobrok, null); + assertTrue(srcServer.waitSlobrok("*/session", 5)); + + // Ensure that normal sending is ok. + assertError(srcSession, "dst1/session", ErrorCode.NONE); + assertError(srcSession, "dst2/session", ErrorCode.NONE); + assertError(srcSession, "dst3/session", ErrorCode.NONE); + assertError(srcSession, "dst4/session", ErrorCode.NONE); + assertError(srcSession, "dst5/session", ErrorCode.NONE); + + // Ensure that 2 OOS services report properly. + OOSServer oosServer = new OOSServer(slobrok, "oos/1", new OOSState() + .add("dst2/session", true) + .add("dst3/session", true)); + assertTrue(srcServer.waitSlobrok("oos/*", 1)); + assertTrue(srcServer.waitState(new OOSState() + .add("dst2/session", true) + .add("dst3/session", true))); + assertError(srcSession, "dst1/session", ErrorCode.NONE); + assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst3/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst4/session", ErrorCode.NONE); + assertError(srcSession, "dst5/session", ErrorCode.NONE); + + // Ensure that 1 OOS service may come up while other stays down. + oosServer.setState(new OOSState().add("dst2/session", true)); + assertTrue(srcServer.waitState(new OOSState() + .add("dst2/session", true) + .add("dst3/session", false))); + assertError(srcSession, "dst1/session", ErrorCode.NONE); + assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst3/session", ErrorCode.NONE); + assertError(srcSession, "dst4/session", ErrorCode.NONE); + assertError(srcSession, "dst5/session", ErrorCode.NONE); + + // Add another OOS server and make sure that it works properly. + OOSServer oosServer2 = new OOSServer(slobrok, "oos/2", new OOSState() + .add("dst4/session", true) + .add("dst5/session", true)); + assertTrue(srcServer.waitSlobrok("oos/*", 2)); + assertTrue(srcServer.waitState(new OOSState() + .add("dst2/session", true) + .add("dst4/session", true) + .add("dst5/session", true))); + assertError(srcSession, "dst1/session", ErrorCode.NONE); + assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst3/session", ErrorCode.NONE); + assertError(srcSession, "dst4/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst5/session", ErrorCode.SERVICE_OOS); + oosServer2.shutdown(); + + // Ensure that shutting down one OOS server will properly propagate. + assertTrue(srcServer.waitSlobrok("oos/*", 1)); + assertTrue(srcServer.waitState(new OOSState() + .add("dst1/session", false) + .add("dst2/session", true) + .add("dst3/session", false) + .add("dst4/session", false) + .add("dst5/session", false))); + assertError(srcSession, "dst1/session", ErrorCode.NONE); + assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst3/session", ErrorCode.NONE); + assertError(srcSession, "dst4/session", ErrorCode.NONE); + assertError(srcSession, "dst5/session", ErrorCode.NONE); + + // Now add two new OOS servers and make sure that works too. + OOSServer oosServer3 = new OOSServer(slobrok, "oos/3", new OOSState() + .add("dst2/session", true) + .add("dst4/session", true)); + OOSServer oosServer4 = new OOSServer(slobrok, "oos/4", new OOSState() + .add("dst2/session", true) + .add("dst3/session", true) + .add("dst5/session", true)); + assertTrue(srcServer.waitSlobrok("oos/*", 3)); + assertTrue(srcServer.waitState(new OOSState() + .add("dst2/session", true) + .add("dst3/session", true) + .add("dst4/session", true) + .add("dst5/session", true))); + assertError(srcSession, "dst1/session", ErrorCode.NONE); + assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst3/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst4/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst5/session", ErrorCode.SERVICE_OOS); + + // Modify the state of the two new servers and make sure it propagates. + oosServer3.setState(new OOSState() + .add("dst2/session", true)); + oosServer4.setState(new OOSState() + .add("dst1/session", true)); + assertTrue(srcServer.waitState(new OOSState() + .add("dst1/session", true) + .add("dst2/session", true) + .add("dst3/session", false) + .add("dst4/session", false) + .add("dst5/session", false))); + assertError(srcSession, "dst1/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst3/session", ErrorCode.NONE); + assertError(srcSession, "dst4/session", ErrorCode.NONE); + assertError(srcSession, "dst5/session", ErrorCode.NONE); + oosServer3.shutdown(); + oosServer4.shutdown(); + + // Ensure that shutting down the two latest OOS servers works properly. + assertTrue(srcServer.waitSlobrok("oos/*", 1)); + assertTrue(srcServer.waitState(new OOSState() + .add("dst1/session", false) + .add("dst2/session", true) + .add("dst3/session", false) + .add("dst4/session", false) + .add("dst5/session", false))); + assertError(srcSession, "dst1/session", ErrorCode.NONE); + assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); + assertError(srcSession, "dst3/session", ErrorCode.NONE); + assertError(srcSession, "dst4/session", ErrorCode.NONE); + assertError(srcSession, "dst5/session", ErrorCode.NONE); + + dst2.destroy(); + assertTrue(srcServer.waitSlobrok("*/session", 4)); + assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS); + + srcSession.destroy(); + dst1.destroy(); + dst2.destroy(); + dst3.destroy(); + dst4.destroy(); + dst5.destroy(); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/RPCNetworkTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/RPCNetworkTestCase.java new file mode 100644 index 00000000000..b2927611248 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/RPCNetworkTestCase.java @@ -0,0 +1,100 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc; + +import com.yahoo.component.Version; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.metrics.MetricSet; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingPolicy; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.text.Utf8String; +import org.junit.Test; + +import java.io.PrintWriter; +import java.io.StringWriter; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class RPCNetworkTestCase { + + @Test + public void requireThatProtocolEncodeExceptionIsCaught() throws Exception { + RuntimeException e = new RuntimeException(); + + Slobrok slobrok = new Slobrok(); + TestServer server = new TestServer(new MessageBusParams().addProtocol(MyProtocol.newEncodeException(e)), + new RPCNetworkParams().setSlobrokConfigId(slobrok.configId())); + Receptor receptor = new Receptor(); + SourceSession src = server.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(receptor)); + DestinationSession dst = server.mb.createDestinationSession(new DestinationSessionParams()); + assertTrue(src.send(new MyMessage().setRoute(Route.parse(dst.getConnectionSpec()))).isAccepted()); + + Reply reply = receptor.getReply(60); + assertNotNull(reply); + assertEquals(1, reply.getNumErrors()); + + StringWriter expected = new StringWriter(); + e.printStackTrace(new PrintWriter(expected)); + + String actual = reply.getError(0).toString(); + assertTrue(actual, actual.contains(expected.toString())); + } + + private static class MyMessage extends Message { + + @Override + public Utf8String getProtocol() { + return new Utf8String(MyProtocol.NAME); + } + + @Override + public int getType() { + return 0; + } + } + + private static class MyProtocol implements Protocol { + + final static String NAME = "myProtocol"; + final RuntimeException encodeException; + + MyProtocol(RuntimeException encodeException) { + this.encodeException = encodeException; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public byte[] encode(Version version, Routable routable) { + throw encodeException; + } + + @Override + public Routable decode(Version version, byte[] payload) { + return null; + } + + @Override + public RoutingPolicy createPolicy(String name, String param) { + return null; + } + + @Override + public MetricSet getMetrics() { + return null; + } + + static MyProtocol newEncodeException(RuntimeException e) { + return new MyProtocol(e); + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java new file mode 100755 index 00000000000..d296d7c7058 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java @@ -0,0 +1,157 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SendAdapterTestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer srcServer, itrServer, dstServer;
+ SourceSession srcSession;
+ IntermediateSession itrSession;
+ DestinationSession dstSession;
+ TestProtocol srcProtocol, itrProtocol, dstProtocol;
+
+ @Before
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(
+ new MessageBusParams().addProtocol(dstProtocol = new TestProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(
+ new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ itrServer = new TestServer(
+ new MessageBusParams().addProtocol(itrProtocol = new TestProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("itr")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ itrSession = itrServer.mb.createIntermediateSession(
+ new IntermediateSessionParams().setName("session").setMessageHandler(new Receptor()).setReplyHandler(new Receptor()));
+ srcServer = new TestServer(
+ new MessageBusParams().addProtocol(srcProtocol = new TestProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("*/session", 2));
+ }
+
+ @After
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ itrSession.destroy();
+ itrServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ @Test
+ public void requireThatMessagesCanBeSentAcrossAllSupportedVersions() throws Exception {
+ List<Version> versions = Arrays.asList(new Version(5, 0), new Version(5, 1));
+ for (Version srcVersion : versions) {
+ for (Version itrVersion : versions) {
+ for (Version dstVersion : versions) {
+ assertVersionedSend(srcVersion, itrVersion, dstVersion);
+ }
+ }
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private void assertVersionedSend(Version srcVersion, Version itrVersion, Version dstVersion) {
+ System.out.println("Sending from " + srcVersion + " through " + itrVersion + " to " + dstVersion + ":");
+ srcServer.net.setVersion(srcVersion);
+ itrServer.net.setVersion(itrVersion);
+ dstServer.net.setVersion(dstVersion);
+
+ Message msg = new SimpleMessage("foo");
+ msg.getTrace().setLevel(9);
+ assertTrue(srcSession.send(msg, Route.parse("itr/session dst/session")).isAccepted());
+ assertNotNull(msg = ((Receptor)itrSession.getMessageHandler()).getMessage(300));
+ System.out.println("\tMessage version " + srcProtocol.lastVersion + " serialized at source.");
+ Version minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
+ assertEquals(minVersion, srcProtocol.lastVersion);
+
+ System.out.println("\tMessage version " + itrProtocol.lastVersion + " reached intermediate.");
+ assertEquals(minVersion, itrProtocol.lastVersion);
+ itrSession.forward(msg);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(300));
+ System.out.println("\tMessage version " + itrProtocol.lastVersion + " serialized at intermediate.");
+ minVersion = itrVersion.compareTo(dstVersion) < 0 ? itrVersion : dstVersion;
+ assertEquals(minVersion, itrProtocol.lastVersion);
+
+ System.out.println("\tMessage version " + dstProtocol.lastVersion + " reached destination.");
+ assertEquals(minVersion, dstProtocol.lastVersion);
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ dstSession.reply(reply);
+ assertNotNull(reply = ((Receptor)itrSession.getReplyHandler()).getReply(300));
+ System.out.println("\tReply version " + dstProtocol.lastVersion + " serialized at destination.");
+ assertEquals(minVersion, dstProtocol.lastVersion);
+
+ System.out.println("\tReply version " + itrProtocol.lastVersion + " reached intermediate.");
+ assertEquals(minVersion, itrProtocol.lastVersion);
+ itrSession.forward(reply);
+ assertNotNull(((Receptor)srcSession.getReplyHandler()).getReply(300));
+ System.out.println("\tReply version " + itrProtocol.lastVersion + " serialized at intermediate.");
+ minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
+ assertEquals(minVersion, itrProtocol.lastVersion);
+
+ System.out.println("\tReply version " + srcProtocol.lastVersion + " reached source.");
+ assertEquals(minVersion, srcProtocol.lastVersion);
+ }
+
+ private static class TestProtocol extends SimpleProtocol {
+
+ Version lastVersion;
+
+ @Override
+ public byte[] encode(Version version, Routable routable) {
+ lastVersion = version;
+ return super.encode(version, routable);
+ }
+
+ public Routable decode(Version version, byte[] payload) {
+ lastVersion = version;
+ return super.decode(version, payload);
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java new file mode 100755 index 00000000000..e69896e2bcf --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java @@ -0,0 +1,90 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ServiceAddressTestCase extends junit.framework.TestCase {
+
+ private Slobrok slobrok;
+ private RPCNetwork network;
+
+ public ServiceAddressTestCase(String msg) {
+ super(msg);
+ }
+
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ network = new RPCNetwork(new RPCNetworkParams()
+ .setIdentity(new Identity("foo"))
+ .setSlobrokConfigId("raw:slobrok[1]\nslobrok[0].connectionspec \"" +
+ new Spec("localhost", slobrok.port()).toString() + "\"\n"));
+ }
+
+ public void tearDown() {
+ network.shutdown();
+ slobrok.stop();
+ }
+
+ public void testAddrServiceAddress() {
+ assertNullAddress("tcp");
+ assertNullAddress("tcp/");
+ assertNullAddress("tcp/localhost");
+ assertNullAddress("tcp/localhost:");
+ assertNullAddress("tcp/localhost:1977");
+ assertNullAddress("tcp/localhost:1977/");
+ assertAddress("tcp/localhost:1977/session", "tcp/localhost:1977", "session");
+ assertNullAddress("tcp/localhost:/session");
+ //assertNullAddress("tcp/:1977/session");
+ assertNullAddress("tcp/:/session");
+ }
+
+ public void testNameServiceAddress() {
+ network.unregisterSession("session");
+ assertTrue(waitSlobrok("foo/session", 0));
+ assertNullAddress("foo/session");
+
+ network.registerSession("session");
+ assertTrue(waitSlobrok("foo/session", 1));
+ assertAddress("foo/session", network.getConnectionSpec(), "session");
+ }
+
+ private boolean waitSlobrok(String pattern, int num) {
+ for (int i = 0; i < 1000 && !Thread.currentThread().isInterrupted(); ++i) {
+ Mirror.Entry[] res = network.getMirror().lookup(pattern);
+ if (res.length == num) {
+ return true;
+ }
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ private void assertNullAddress(String pattern) {
+ assertNull(new RPCService(network.getMirror(), pattern).resolve());
+ }
+
+ private void assertAddress(String pattern, String expectedSpec, String expectedSession) {
+ RPCService service = new RPCService(network.getMirror(), pattern);
+ RPCServiceAddress obj = service.resolve();
+ assertNotNull(obj);
+ assertNotNull(obj.getConnectionSpec());
+ assertEquals(expectedSpec, obj.getConnectionSpec().toString());
+ if (expectedSession != null) {
+ assertEquals(expectedSession, obj.getSessionName());
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java new file mode 100644 index 00000000000..42580b963a5 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java @@ -0,0 +1,57 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import junit.framework.TestCase;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ServicePoolTestCase extends TestCase {
+
+ public void testMaxSize() throws ListenFailedException {
+ Slobrok slobrok = new Slobrok();
+ RPCNetwork net = new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ RPCServicePool pool = new RPCServicePool(net, 2);
+
+ pool.resolve("foo");
+ assertEquals(1, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(!pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("foo");
+ assertEquals(1, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(!pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("bar");
+ assertEquals(2, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("baz");
+ assertEquals(2, pool.getSize());
+ assertTrue(!pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(pool.hasService("baz"));
+
+ pool.resolve("bar");
+ assertEquals(2, pool.getSize());
+ assertTrue(!pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(pool.hasService("baz"));
+
+ pool.resolve("foo");
+ assertEquals(2, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ slobrok.stop();
+ }
+}
\ No newline at end of file diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java new file mode 100644 index 00000000000..3f502d4da2f --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java @@ -0,0 +1,151 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc; + + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.Identity; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + + +/** + * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a> + */ +public class SlobrokTestCase extends junit.framework.TestCase { + + private static class Res { + private List<Mirror.Entry> lst = new ArrayList<>(); + public Res add(String fullName, String spec) { + lst.add(new Mirror.Entry(fullName, spec)); + return this; + } + public Mirror.Entry[] toArray() { + return lst.toArray(new Mirror.Entry[lst.size()]); + } + } + + + public SlobrokTestCase(String message) { + super(message); + } + + Slobrok slobrok; + RPCNetwork net1; + RPCNetwork net2; + RPCNetwork net3; + int port1; + int port2; + int port3; + + void check(RPCNetwork net, String pattern, Mirror.Entry[] expect) { + Comparator<Mirror.Entry> cmp = new Comparator<Mirror.Entry>() { + public int compare(Mirror.Entry a, Mirror.Entry b) { + return a.compareTo(b); + } + }; + Arrays.sort(expect, cmp); + Mirror.Entry[] actual = null; + for (int i = 0; i < 1000; i++) { + actual = net.getMirror().lookup(pattern); + Arrays.sort(actual, cmp); + if (Arrays.equals(actual, expect)) { + System.out.printf("lookup successful for pattern: %s\n", pattern); + return; + } + try { Thread.sleep(10); } catch (InterruptedException e) { + // + } + } + System.out.printf("lookup failed for pattern: %s\n", pattern); + System.out.printf("actual values:\n"); + if (actual == null || actual.length == 0) { + System.out.printf(" { EMPTY }\n"); + } else { + for (Mirror.Entry entry : actual) { + System.out.printf(" { %s, %s }\n", entry.getName(), entry.getSpec()); + } + } + System.out.printf("expected values:\n"); + if (expect.length == 0) { + System.out.printf(" { EMPTY }\n"); + } else { + for (Mirror.Entry entry : expect) { + System.out.printf(" { %s, %s }\n", entry.getName(), entry.getSpec()); + } + } + assertTrue(false); + } + + public void setUp() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + String slobrokCfgId = "raw:slobrok[1]\nslobrok[0].connectionspec \"" + new Spec("localhost", slobrok.port()).toString() + "\"\n"; + net1 = new RPCNetwork(new RPCNetworkParams().setIdentity(new Identity("net/a")).setSlobrokConfigId(slobrokCfgId)); + net2 = new RPCNetwork(new RPCNetworkParams().setIdentity(new Identity("net/b")).setSlobrokConfigId(slobrokCfgId)); + net3 = new RPCNetwork(new RPCNetworkParams().setIdentity(new Identity("net/c")).setSlobrokConfigId(slobrokCfgId)); + port1 = net1.getPort(); + port2 = net2.getPort(); + port3 = net3.getPort(); + } + + public void tearDown() { + net3.shutdown(); + net2.shutdown(); + net1.shutdown(); + slobrok.stop(); + } + + public void testSlobrok() { + net1.registerSession("foo"); + net2.registerSession("foo"); + net2.registerSession("bar"); + net3.registerSession("foo"); + net3.registerSession("bar"); + net3.registerSession("baz"); + + check(net1, "*/*/*", new Res() + .add("net/a/foo", net1.getConnectionSpec()) + .add("net/b/foo", net2.getConnectionSpec()) + .add("net/b/bar", net2.getConnectionSpec()) + .add("net/c/foo", net3.getConnectionSpec()) + .add("net/c/bar", net3.getConnectionSpec()) + .add("net/c/baz", net3.getConnectionSpec()).toArray()); + check(net2, "*/*/*", new Res() + .add("net/a/foo", net1.getConnectionSpec()) + .add("net/b/foo", net2.getConnectionSpec()) + .add("net/b/bar", net2.getConnectionSpec()) + .add("net/c/foo", net3.getConnectionSpec()) + .add("net/c/bar", net3.getConnectionSpec()) + .add("net/c/baz", net3.getConnectionSpec()).toArray()); + check(net3, "*/*/*", new Res() + .add("net/a/foo", net1.getConnectionSpec()) + .add("net/b/foo", net2.getConnectionSpec()) + .add("net/b/bar", net2.getConnectionSpec()) + .add("net/c/foo", net3.getConnectionSpec()) + .add("net/c/bar", net3.getConnectionSpec()) + .add("net/c/baz", net3.getConnectionSpec()).toArray()); + + net2.unregisterSession("bar"); + net3.unregisterSession("bar"); + net3.unregisterSession("baz"); + + check(net1, "*/*/*", new Res() + .add("net/a/foo", net1.getConnectionSpec()) + .add("net/b/foo", net2.getConnectionSpec()) + .add("net/c/foo", net3.getConnectionSpec()).toArray()); + check(net2, "*/*/*", new Res() + .add("net/a/foo", net1.getConnectionSpec()) + .add("net/b/foo", net2.getConnectionSpec()) + .add("net/c/foo", net3.getConnectionSpec()).toArray()); + check(net3, "*/*/*", new Res() + .add("net/a/foo", net1.getConnectionSpec()) + .add("net/b/foo", net2.getConnectionSpec()) + .add("net/c/foo", net3.getConnectionSpec()).toArray()); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java new file mode 100755 index 00000000000..8d58b4dd89f --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java @@ -0,0 +1,112 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Transport;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.concurrent.Timer;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class TargetPoolTestCase extends junit.framework.TestCase {
+
+ private Slobrok slobrok;
+ private List<TestServer> servers;
+ private Supervisor orb;
+
+ @Override
+ public void setUp() throws ListenFailedException {
+ slobrok = new Slobrok();
+ servers = new ArrayList<>();
+ orb = new Supervisor(new Transport());
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ for (TestServer server : servers) {
+ server.destroy();
+ }
+ orb.transport().shutdown().join();
+ }
+
+ public void testConnectionExpire() throws ListenFailedException, UnknownHostException {
+ // Necessary setup to be able to resolve targets.
+ RPCServiceAddress adr1 = registerServer();
+ RPCServiceAddress adr2 = registerServer();
+ RPCServiceAddress adr3 = registerServer();
+
+ PoolTimer timer = new PoolTimer();
+ RPCTargetPool pool = new RPCTargetPool(timer, 0.666);
+
+ // Assert that all connections expire.
+ RPCTarget target;
+ assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ for (int i = 0; i < 10; ++i) {
+ pool.flushTargets(false);
+ assertEquals(3, pool.size());
+ }
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+
+ // Assert that only idle connections expire.
+ assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(3, pool.size());
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(2, pool.size());
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(1, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+
+ // Assert that connections never expire while they are referenced.
+ assertNotNull(target = pool.getTarget(orb, adr1));
+ assertEquals(1, pool.size());
+ for (int i = 0; i < 10; ++i) {
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(1, pool.size());
+ }
+ target.subRef();
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+ }
+
+ private RPCServiceAddress registerServer() throws ListenFailedException, UnknownHostException {
+ servers.add(new TestServer("srv" + servers.size(), null, slobrok, null, null));
+ return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec());
+ }
+
+ private static class PoolTimer implements Timer {
+ long millis = 0;
+
+ @Override
+ public long milliTime() {
+ return millis;
+ }
+ }
+}
\ No newline at end of file diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/AdvancedRoutingTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/AdvancedRoutingTestCase.java new file mode 100755 index 00000000000..62418c2766b --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/AdvancedRoutingTestCase.java @@ -0,0 +1,119 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.routing; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.test.CustomPolicyFactory; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.net.UnknownHostException; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class AdvancedRoutingTestCase extends junit.framework.TestCase { + + //////////////////////////////////////////////////////////////////////////////// + // + // Setup + // + //////////////////////////////////////////////////////////////////////////////// + + Slobrok slobrok; + TestServer srcServer, dstServer; + SourceSession srcSession; + DestinationSession dstFoo, dstBar, dstBaz; + + @Override + public void setUp() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + dstFoo = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("foo").setMessageHandler(new Receptor())); + dstBar = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("bar").setMessageHandler(new Receptor())); + dstBaz = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("baz").setMessageHandler(new Receptor())); + srcServer = new TestServer(new MessageBusParams().setRetryPolicy(new RetryTransientErrorsPolicy().setBaseDelay(0)).addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + srcSession = srcServer.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor())); + assertTrue(srcServer.waitSlobrok("dst/*", 3)); + } + + @Override + public void tearDown() { + slobrok.stop(); + dstFoo.destroy(); + dstBar.destroy(); + dstBaz.destroy(); + dstServer.destroy(); + srcSession.destroy(); + srcServer.destroy(); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Tests + // + //////////////////////////////////////////////////////////////////////////////// + + public void testAdvanced() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory(false, ErrorCode.NO_ADDRESS_FOR_SERVICE)); + srcServer.mb.putProtocol(protocol); + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addHop(new HopSpec("bar", "dst/bar")) + .addHop(new HopSpec("baz", "dst/baz")) + .addRoute(new RouteSpec("baz").addHop("baz"))); + Message msg = new SimpleMessage("msg"); + msg.getTrace().setLevel(9); + assertTrue(srcSession.send(msg, Route.parse("[Custom:" + dstFoo.getConnectionSpec() + ",bar,route:baz,dst/cox,?dst/unknown]")).isAccepted()); + + // Initial send. + assertNotNull(msg = ((Receptor)dstFoo.getMessageHandler()).getMessage(60)); + dstFoo.acknowledge(msg); + assertNotNull(msg = ((Receptor)dstBar.getMessageHandler()).getMessage(60)); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.TRANSIENT_ERROR, "bar")); + dstBar.reply(reply); + assertNotNull(msg = ((Receptor)dstBaz.getMessageHandler()).getMessage(60)); + reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.TRANSIENT_ERROR, "baz1")); + dstBaz.reply(reply); + + // First retry. + assertNull(((Receptor)dstFoo.getMessageHandler()).getMessage(0)); + assertNotNull(msg = ((Receptor)dstBar.getMessageHandler()).getMessage(60)); + dstBar.acknowledge(msg); + assertNotNull(msg = ((Receptor)dstBaz.getMessageHandler()).getMessage(60)); + reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.TRANSIENT_ERROR, "baz2")); + dstBaz.reply(reply); + + // Second retry. + assertNull(((Receptor)dstFoo.getMessageHandler()).getMessage(0)); + assertNull(((Receptor)dstBar.getMessageHandler()).getMessage(0)); + assertNotNull(msg = ((Receptor)dstBaz.getMessageHandler()).getMessage(60)); + reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.FATAL_ERROR, "baz3")); + dstBaz.reply(reply); + + // Done. + reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(2, reply.getNumErrors()); + assertEquals(ErrorCode.FATAL_ERROR, reply.getError(0).getCode()); + assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(1).getCode()); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/ResenderTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/ResenderTestCase.java new file mode 100755 index 00000000000..48993343f38 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/ResenderTestCase.java @@ -0,0 +1,200 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.routing; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.net.UnknownHostException; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ResenderTestCase extends junit.framework.TestCase { + + //////////////////////////////////////////////////////////////////////////////// + // + // Setup + // + //////////////////////////////////////////////////////////////////////////////// + + Slobrok slobrok; + TestServer srcServer, dstServer; + SourceSession srcSession; + DestinationSession dstSession; + RetryTransientErrorsPolicy retryPolicy; + + @Override + public void setUp() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor())); + retryPolicy = new RetryTransientErrorsPolicy(); + retryPolicy.setBaseDelay(0); + srcServer = new TestServer(new MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + srcSession = srcServer.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor())); + assertTrue(srcServer.waitSlobrok("dst/session", 1)); + } + + @Override + public void tearDown() { + slobrok.stop(); + dstSession.destroy(); + dstServer.destroy(); + srcSession.destroy(); + srcServer.destroy(); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Tests + // + //////////////////////////////////////////////////////////////////////////////// + + public void testRetryTag() { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + for (int i = 0; i < 5; ++i) { + assertEquals(i, msg.getRetry()); + assertEquals(true, msg.getRetryEnabled()); + replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, 0); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + } + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0)); + System.out.println(reply.getTrace()); + } + + public void testRetryEnabledTag() { + Message msg = createMessage("msg"); + msg.setRetryEnabled(false); + assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted()); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + assertEquals(false, msg.getRetryEnabled()); + replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, 0); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertTrue(reply.hasErrors()); + assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0)); + System.out.println(reply.getTrace()); + } + + public void testTransientError() { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, 0); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + replyFromDestination(msg, ErrorCode.APP_FATAL_ERROR, 0); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertTrue(reply.hasFatalErrors()); + assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0)); + } + + public void testFatalError() { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + replyFromDestination(msg, ErrorCode.APP_FATAL_ERROR, 0); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertTrue(reply.hasFatalErrors()); + assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0)); + } + + public void testDisableRetry() { + retryPolicy.setEnabled(false); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, 0); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertTrue(reply.hasErrors()); + assertTrue(!reply.hasFatalErrors()); + assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0)); + } + + public void testRetryDelay() { + retryPolicy.setBaseDelay(0.01); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + for (int i = 0; i < 5; ++i) { + replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, -1); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + } + replyFromDestination(msg, ErrorCode.APP_FATAL_ERROR, 0); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertTrue(reply.hasFatalErrors()); + assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0)); + + String trace = reply.getTrace().toString(); + assertTrue(trace.contains("retry 1 in 0.01")); + assertTrue(trace.contains("retry 2 in 0.02")); + assertTrue(trace.contains("retry 3 in 0.03")); + assertTrue(trace.contains("retry 4 in 0.04")); + assertTrue(trace.contains("retry 5 in 0.05")); + } + + public void testRequestRetryDelay() { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + for (int i = 0; i < 5; ++i) { + replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, i / 50.0); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + } + replyFromDestination(msg, ErrorCode.APP_FATAL_ERROR, 0); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + assertTrue(reply.hasFatalErrors()); + assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0)); + + String trace = reply.getTrace().toString(); + System.out.println(trace); + assertTrue(trace.contains("retry 1 in 0")); + assertTrue(trace.contains("retry 2 in 0.02")); + assertTrue(trace.contains("retry 3 in 0.04")); + assertTrue(trace.contains("retry 4 in 0.06")); + assertTrue(trace.contains("retry 5 in 0.08")); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Utilities + // + //////////////////////////////////////////////////////////////////////////////// + + private static Message createMessage(String msg) { + SimpleMessage ret = new SimpleMessage(msg); + ret.getTrace().setLevel(9); + return ret; + } + + private void replyFromDestination(Message msg, int errorCode, double retryDelay) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + if (errorCode != ErrorCode.NONE) { + reply.addError(new Error(errorCode, "err")); + } + reply.setRetryDelay(retryDelay); + dstSession.reply(reply); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java new file mode 100644 index 00000000000..ad13f3325c7 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.routing;
+
+import com.yahoo.messagebus.ErrorCode;
+import junit.framework.TestCase;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RetryPolicyTestCase extends TestCase {
+
+ public void testSimpleRetryPolicy() {
+ RetryTransientErrorsPolicy policy = new RetryTransientErrorsPolicy();
+ for (int i = 0; i < 5; ++i) {
+ double delay = i / 3.0;
+ policy.setBaseDelay(delay);
+ for (int j = 0; j < 5; ++j) {
+ assertEquals((int)(j * delay), (int)policy.getRetryDelay(j));
+ }
+ for (int j = ErrorCode.NONE; j < ErrorCode.ERROR_LIMIT; ++j) {
+ policy.setEnabled(true);
+ if (j < ErrorCode.FATAL_ERROR) {
+ assertTrue(policy.canRetry(j));
+ } else {
+ assertFalse(policy.canRetry(j));
+ }
+ policy.setEnabled(false);
+ assertFalse(policy.canRetry(j));
+ }
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RouteParserTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RouteParserTestCase.java new file mode 100755 index 00000000000..e4d120271bc --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RouteParserTestCase.java @@ -0,0 +1,168 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.routing; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RouteParserTestCase extends junit.framework.TestCase { + + public void testHopParser() { + Hop hop = Hop.parse("foo"); + assertNotNull(hop); + assertEquals(1, hop.getNumDirectives()); + assertVerbatimDirective(hop.getDirective(0), "foo"); + + assertNotNull(hop = Hop.parse("foo/bar")); + assertEquals(2, hop.getNumDirectives()); + assertVerbatimDirective(hop.getDirective(0), "foo"); + assertVerbatimDirective(hop.getDirective(1), "bar"); + + assertNotNull(hop = Hop.parse("tcp/foo:666/bar")); + assertEquals(1, hop.getNumDirectives()); + assertTcpDirective(hop.getDirective(0), "foo", 666, "bar"); + + assertNotNull(hop = Hop.parse("route:foo")); + assertEquals(1, hop.getNumDirectives()); + assertRouteDirective(hop.getDirective(0), "foo"); + + assertNotNull(hop = Hop.parse("[Extern:tcp/localhost:3619;foo/bar]")); + assertEquals(1, hop.getNumDirectives()); + assertPolicyDirective(hop.getDirective(0), "Extern","tcp/localhost:3619;foo/bar"); + + assertNotNull(hop = Hop.parse("[AND:foo bar]")); + assertEquals(1, hop.getNumDirectives()); + assertPolicyDirective(hop.getDirective(0), "AND","foo bar"); + + assertNotNull(hop = Hop.parse("[DocumentRouteSelector:raw:route[2]\n" + + "route[0].name \"foo\"\n" + + "route[0].selector \"testdoc\"\n" + + "route[0].feed \"myfeed\"\n" + + "route[1].name \"bar\"\n" + + "route[1].selector \"other\"\n" + + "route[1].feed \"myfeed\"\n" + + "]")); + assertEquals(1, hop.getNumDirectives()); + assertPolicyDirective(hop.getDirective(0), "DocumentRouteSelector", + "raw:route[2]\n" + + "route[0].name \"foo\"\n" + + "route[0].selector \"testdoc\"\n" + + "route[0].feed \"myfeed\"\n" + + "route[1].name \"bar\"\n" + + "route[1].selector \"other\"\n" + + "route[1].feed \"myfeed\""); + + assertNotNull(hop = Hop.parse("[DocumentRouteSelector:raw:route[1]\n" + + "route[0].name \"docproc/cluster.foo\"\n" + + "route[0].selector \"testdoc\"\n" + + "route[0].feed \"myfeed\"" + + "]")); + assertEquals(1, hop.getNumDirectives()); + assertPolicyDirective(hop.getDirective(0), "DocumentRouteSelector", + "raw:route[1]\n" + + "route[0].name \"docproc/cluster.foo\"\n" + + "route[0].selector \"testdoc\"\n" + + "route[0].feed \"myfeed\""); + } + + public void testHopParserErrors() { + assertError(Hop.parse(""), "Failed to parse empty string."); + assertError(Hop.parse("[foo"), "Unterminated '[' in '[foo'"); + assertError(Hop.parse("foo/[bar]]"), "Unexpected token ']' in 'foo/[bar]]'"); + assertError(Hop.parse("foo bar"), "Failed to completely parse 'foo bar'."); + } + + public void testShortRoute() { + Route shortRoute = Route.parse("c"); + assertNotNull(shortRoute); + assertEquals(1, shortRoute.getNumHops()); + Hop hop = shortRoute.getHop(0); + assertNotNull(hop); + assertEquals(1, hop.getNumDirectives()); + assertVerbatimDirective(hop.getDirective(0), "c"); + } + + public void testShortHops() { + Route shortRoute = Route.parse("a b c"); + assertNotNull(shortRoute); + assertEquals(3, shortRoute.getNumHops()); + Hop hop = shortRoute.getHop(0); + assertNotNull(hop); + assertEquals(1, hop.getNumDirectives()); + assertVerbatimDirective(hop.getDirective(0), "a"); + } + + public void testRouteParser() { + Route route = Route.parse("foo bar/baz"); + assertNotNull(route); + assertEquals(2, route.getNumHops()); + Hop hop = route.getHop(0); + assertNotNull(hop); + assertEquals(1, hop.getNumDirectives()); + assertVerbatimDirective(hop.getDirective(0), "foo"); + assertNotNull(hop = route.getHop(1)); + assertEquals(2, hop.getNumDirectives()); + assertVerbatimDirective(hop.getDirective(0), "bar"); + assertVerbatimDirective(hop.getDirective(1), "baz"); + + assertNotNull(route = Route.parse("[Extern:tcp/localhost:3633;itr/session] default")); + assertEquals(2, route.getNumHops()); + assertNotNull(hop = route.getHop(0)); + assertEquals(1, hop.getNumDirectives()); + assertPolicyDirective(hop.getDirective(0), "Extern", "tcp/localhost:3633;itr/session"); + assertNotNull(hop = route.getHop(1)); + assertEquals(1, hop.getNumDirectives()); + assertVerbatimDirective(hop.getDirective(0), "default"); + } + + public void testRouteParserErrors() { + assertError(Route.parse(""), "Failed to parse empty string."); + assertError(Route.parse("foo [bar"), "Unterminated '[' in '[bar'"); + assertError(Route.parse("foo bar/[baz]]"), "Unexpected token ']' in 'bar/[baz]]'"); + } + + private static void assertError(Route route, String msg) { + assertNotNull(route); + assertEquals(1, route.getNumHops()); + assertError(route.getHop(0), msg); + } + + private static void assertError(Hop hop, String msg) { + assertNotNull(hop); + System.out.println(hop.toDebugString()); + assertEquals(1, hop.getNumDirectives()); + assertErrorDirective(hop.getDirective(0), msg); + } + + private static void assertErrorDirective(HopDirective dir, String msg) { + assertNotNull(dir); + assertTrue(dir instanceof ErrorDirective); + assertEquals(msg, ((ErrorDirective)dir).getMessage()); + } + + private static void assertPolicyDirective(HopDirective dir, String name, String param) { + assertNotNull(dir); + assertTrue(dir instanceof PolicyDirective); + assertEquals(name, ((PolicyDirective)dir).getName()); + assertEquals(param, ((PolicyDirective)dir).getParam()); + } + + private static void assertRouteDirective(HopDirective dir, String name) { + assertNotNull(dir); + assertTrue(dir instanceof RouteDirective); + assertEquals(name, ((RouteDirective)dir).getName()); + } + + private static void assertTcpDirective(HopDirective dir, String host, int port, String session) { + assertNotNull(dir); + assertTrue(dir instanceof TcpDirective); + assertEquals(host, ((TcpDirective)dir).getHost()); + assertEquals(port, ((TcpDirective)dir).getPort()); + assertEquals(session, ((TcpDirective)dir).getSession()); + } + + private static void assertVerbatimDirective(HopDirective dir, String image) { + assertNotNull(dir); + assertTrue(dir instanceof VerbatimDirective); + assertEquals(image, ((VerbatimDirective)dir).getImage()); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingContextTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingContextTestCase.java new file mode 100755 index 00000000000..ea217af5b9a --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingContextTestCase.java @@ -0,0 +1,258 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.routing; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RoutingContextTestCase extends junit.framework.TestCase { + public static final int TIMEOUT_SECS = 120; + + //////////////////////////////////////////////////////////////////////////////// + // + // Setup + // + //////////////////////////////////////////////////////////////////////////////// + + Slobrok slobrok; + TestServer srcServer, dstServer; + SourceSession srcSession; + DestinationSession dstSession; + + @Override + public void setUp() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor())); + srcServer = new TestServer(new MessageBusParams().setRetryPolicy(new RetryTransientErrorsPolicy().setBaseDelay(0)).addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + srcSession = srcServer.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor())); + assertTrue(srcServer.waitSlobrok("dst/session", 1)); + } + + @Override + public void tearDown() { + slobrok.stop(); + dstSession.destroy(); + dstServer.destroy(); + srcSession.destroy(); + srcServer.destroy(); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Tests + // + //////////////////////////////////////////////////////////////////////////////// + + public void testSingleDirective() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory( + false, + Arrays.asList("foo", "bar", "baz/cox"), + Arrays.asList("foo", "bar"))); + srcServer.mb.putProtocol(protocol); + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addRoute(new RouteSpec("myroute").addHop("myhop")) + .addHop(new HopSpec("myhop", "[Custom]") + .addRecipient("foo").addRecipient("bar").addRecipient("baz/cox"))); + for (int i = 0; i < 2; ++i) { + assertTrue(srcSession.send(createMessage("msg"), "myroute").isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(TIMEOUT_SECS); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + } + + public void testMoreDirectives() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory( + false, + Arrays.asList("foo", "foo/bar", "foo/bar0/baz", "foo/bar1/baz", "foo/bar/baz/cox"), + Arrays.asList("foo/bar0/baz", "foo/bar1/baz"))); + srcServer.mb.putProtocol(protocol); + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addRoute(new RouteSpec("myroute").addHop("myhop")) + .addHop(new HopSpec("myhop", "foo/[Custom]/baz") + .addRecipient("foo").addRecipient("foo/bar") + .addRecipient("foo/bar0/baz").addRecipient("foo/bar1/baz") + .addRecipient("foo/bar/baz/cox"))); + for (int i = 0; i < 2; ++i) { + assertTrue(srcSession.send(createMessage("msg"), "myroute").isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(TIMEOUT_SECS); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + } + + public void testRecipientsRemain() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("First", new CustomPolicyFactory(true, Arrays.asList("foo/bar"), Arrays.asList("foo/[Second]"))); + protocol.addPolicyFactory("Second", new CustomPolicyFactory(false, Arrays.asList("foo/bar"), Arrays.asList("foo/bar"))); + srcServer.mb.putProtocol(protocol); + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addRoute(new RouteSpec("myroute").addHop("myhop")) + .addHop(new HopSpec("myhop", "[First]/[Second]") + .addRecipient("foo/bar"))); + for (int i = 0; i < 2; ++i) { + assertTrue(srcSession.send(createMessage("msg"), "myroute").isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(TIMEOUT_SECS); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + } + + public void testConstRoute() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("DocumentRouteSelector", + new CustomPolicyFactory(true, Arrays.asList("dst"), Arrays.asList("dst"))); + srcServer.mb.putProtocol(protocol); + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addRoute(new RouteSpec("default").addHop("indexing")) + .addHop(new HopSpec("indexing", "[DocumentRouteSelector]").addRecipient("dst")) + .addHop(new HopSpec("dst", "dst/session"))); + for (int i = 0; i < 2; ++i) { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("route:default")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(TIMEOUT_SECS); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(TIMEOUT_SECS); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Utilities + // + //////////////////////////////////////////////////////////////////////////////// + + private Message createMessage(String msg) { + Message ret = new SimpleMessage(msg); + ret.getTrace().setLevel(9); + return ret; + } + + private static class CustomPolicyFactory implements SimpleProtocol.PolicyFactory { + + final boolean forward; + final List<String> expectedAll; + final List<String> expectedMatched; + + public CustomPolicyFactory(boolean forward, List<String> all, List<String> matched) { + this.forward = forward; + this.expectedAll = all; + this.expectedMatched = matched; + } + + public RoutingPolicy create(String param) { + return new CustomPolicy(this); + } + } + + private static class CustomPolicy implements RoutingPolicy { + + CustomPolicyFactory factory; + + public CustomPolicy(CustomPolicyFactory factory) { + this.factory = factory; + } + + public void select(RoutingContext ctx) { + Reply reply = new EmptyReply(); + reply.getTrace().setLevel(9); + + List<Route> recipients = ctx.getAllRecipients(); + if (factory.expectedAll.size() == recipients.size()) { + ctx.trace(1, "Got " + recipients.size() + " expected recipients."); + for (Route route : recipients) { + if (factory.expectedAll.contains(route.toString())) { + ctx.trace(1, "Got expected recipient '" + route + "'."); + } else { + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, + "Recipient '" + route + "' not expected.")); + } + } + } else { + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, + "Expected " + factory.expectedAll.size() + " recipients, got " + recipients.size() + ".")); + } + + if (ctx.getNumRecipients() == recipients.size()) { + for (int i = 0; i < recipients.size(); ++i) { + if (recipients.get(i) == ctx.getRecipient(i)) { + ctx.trace(1, "getRecipient(" + i + ") matches getAllRecipients().get(" + i + ")"); + } else { + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, + "getRecipient(" + i + ") differs from getAllRecipients().get(" + i + ")")); + } + } + } else { + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, + "getNumRecipients() differs from getAllRecipients().size()")); + } + + recipients = ctx.getMatchedRecipients(); + if (factory.expectedMatched.size() == recipients.size()) { + ctx.trace(1, "Got " + recipients.size() + " matched recipients."); + for (Route route : recipients) { + if (factory.expectedMatched.contains(route.toString())) { + ctx.trace(1, "Got matched recipient '" + route + "'."); + } else { + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, + "Matched recipient '" + route + "' not expected.")); + } + } + } else { + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, + "Expected " + factory.expectedAll.size() + " matched recipients, got " + recipients.size() + ".")); + } + + if (!reply.hasErrors() && factory.forward) { + for (Route route : recipients) { + ctx.addChild(route); + } + } else { + ctx.setReply(reply); + } + } + + public void merge(RoutingContext ctx) { + Reply ret = new EmptyReply(); + for (RoutingNodeIterator it = ctx.getChildIterator(); + it.isValid(); it.next()) { + Reply reply = it.getReplyRef(); + for (int i = 0; i < reply.getNumErrors(); ++i) { + ret.addError(reply.getError(i)); + } + } + ctx.setReply(ret); + } + + @Override + public void destroy() { + } + } + +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java new file mode 100755 index 00000000000..3cbb9d86e44 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java @@ -0,0 +1,336 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.routing;
+
+import com.yahoo.messagebus.ConfigAgent;
+import com.yahoo.messagebus.ConfigHandler;
+import junit.framework.TestCase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RoutingSpecTestCase extends TestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public void testConfig() {
+ assertConfig(new RoutingSpec());
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
+ .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
+ .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
+ .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
+ .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2")))
+ .addTable(new RoutingTableSpec("mytable2")));
+ assertEquals("routingtable[2]\n" +
+ "routingtable[0].protocol \"mytable1\"\n" +
+ "routingtable[1].protocol \"mytable2\"\n" +
+ "routingtable[1].hop[3]\n" +
+ "routingtable[1].hop[0].name \"myhop1\"\n" +
+ "routingtable[1].hop[0].selector \"myselector1\"\n" +
+ "routingtable[1].hop[1].name \"myhop2\"\n" +
+ "routingtable[1].hop[1].selector \"myselector2\"\n" +
+ "routingtable[1].hop[1].ignoreresult true\n" +
+ "routingtable[1].hop[2].name \"myhop1\"\n" +
+ "routingtable[1].hop[2].selector \"myselector3\"\n" +
+ "routingtable[1].hop[2].recipient[2]\n" +
+ "routingtable[1].hop[2].recipient[0] \"myrecipient1\"\n" +
+ "routingtable[1].hop[2].recipient[1] \"myrecipient2\"\n" +
+ "routingtable[1].route[1]\n" +
+ "routingtable[1].route[0].name \"myroute1\"\n" +
+ "routingtable[1].route[0].hop[1]\n" +
+ "routingtable[1].route[0].hop[0] \"myhop1\"\n",
+ new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable1"))
+ .addTable(new RoutingTableSpec("mytable2")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2").setIgnoreResult(true))
+ .addHop(new HopSpec("myhop1", "myselector3")
+ .addRecipient("myrecipient1")
+ .addRecipient("myrecipient2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))).toString());
+ }
+
+ public void testApplicationSpec() {
+ assertApplicationSpec(Arrays.asList("foo"),
+ Arrays.asList("foo",
+ "*"));
+ assertApplicationSpec(Arrays.asList("foo/bar"),
+ Arrays.asList("foo/bar",
+ "foo/*",
+ "*/bar",
+ "*/*"));
+ assertApplicationSpec(Arrays.asList("foo/0/baz",
+ "foo/1/baz",
+ "foo/2/baz"),
+ Arrays.asList("foo/0/baz",
+ "foo/1/baz",
+ "foo/2/baz",
+ "foo/0/*",
+ "foo/1/*",
+ "foo/2/*",
+ "foo/*/baz",
+ "*/0/baz",
+ "*/1/baz",
+ "*/2/baz",
+ "foo/*/*",
+ "*/0/*",
+ "*/1/*",
+ "*/2/*",
+ "*/*/baz",
+ "*/*/*"));
+ }
+
+ public void testVeriyfOk() {
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "myservice1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("route1").addHop("myservice1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "myservice1"))
+ .addRoute(new RouteSpec("route1").addHop("hop1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "route:route2"))
+ .addHop(new HopSpec("hop2", "myservice1"))
+ .addRoute(new RouteSpec("route1").addHop("hop1"))
+ .addRoute(new RouteSpec("route2").addHop("hop2"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("myhop1", "foo/[bar]/baz").addRecipient("foo/0/baz").addRecipient("foo/1/baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "foo/0/baz")
+ .addService("mytable", "foo/1/baz"));
+ }
+
+ public void testVerifyToggle() {
+ assertVerifyOk(new RoutingSpec(false)
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable", false)
+ .addHop(new HopSpec("foo", "bar"))
+ .addHop(new HopSpec("foo", "baz"))),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "", false))),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo", false))),
+ new ApplicationSpec());
+ }
+
+ public void testVerifyFail() {
+ // Duplicate table.
+ assertVerifyFail(new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")),
+ new ApplicationSpec(),
+ Arrays.asList("Routing table 'mytable' is defined 2 times."));
+
+ // Duplicate hop.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar"))
+ .addHop(new HopSpec("foo", "baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' is defined 2 times."));
+
+ // Duplicate route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar"))
+ .addRoute(new RouteSpec("foo").addHop("baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Route 'foo' in routing table 'mytable' is defined 2 times."));
+
+ // Empty hop.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", ""))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to parse empty string."));
+
+ // Empty route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo"))),
+ new ApplicationSpec(),
+ Arrays.asList("Route 'foo' in routing table 'mytable' has no hops."));
+
+ // Hop error.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop error in recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "[bar]").addRecipient("bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For recipient 'bar/baz cox' in hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop error in route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 1 in route 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references 'bar' which is neither a service, a route nor another hop."));
+
+ // Mismatched recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/[baz]/cox").addRecipient("cox/0/bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'foo' in routing table 'mytable'."));
+
+ // Route not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "route:bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
+
+ // Route not found in route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("route:bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
+
+ // Service not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/baz"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop."));
+
+ // Unexpected recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar").addRecipient("baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' has recipients but no policy directive."));
+
+ // Multiple errors.
+ assertVerifyFail(new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "bar"))
+ .addHop(new HopSpec("hop1", "baz"))
+ .addHop(new HopSpec("hop2", ""))
+ .addHop(new HopSpec("hop3", "bar/baz cox"))
+ .addHop(new HopSpec("hop4", "[bar]").addRecipient("bar/baz cox"))
+ .addHop(new HopSpec("hop5", "bar/[baz]/cox").addRecipient("cox/0/bar"))
+ .addHop(new HopSpec("hop6", "route:route69"))
+ .addHop(new HopSpec("hop7", "bar/baz"))
+ .addHop(new HopSpec("hop8", "bar").addRecipient("baz"))
+ .addRoute(new RouteSpec("route1").addHop("bar"))
+ .addRoute(new RouteSpec("route1").addHop("baz"))
+ .addRoute(new RouteSpec("route2").addHop(""))
+ .addRoute(new RouteSpec("route3").addHop("bar/baz cox"))
+ .addRoute(new RouteSpec("route4").addHop("hop69"))
+ .addRoute(new RouteSpec("route5").addHop("route:route69"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Routing table 'mytable' is defined 2 times.",
+ "For hop 'hop2' in routing table 'mytable'; Failed to parse empty string.",
+ "For hop 'hop3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "For hop 1 in route 'route2' in routing table 'mytable'; Failed to parse empty string.",
+ "For hop 1 in route 'route3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "For recipient 'bar/baz cox' in hop 'hop4' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "Hop 'hop1' in routing table 'mytable' is defined 2 times.",
+ "Hop 'hop6' in routing table 'mytable' references route 'route69' which does not exist.",
+ "Hop 'hop7' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop.",
+ "Hop 'hop8' in routing table 'mytable' has recipients but no policy directive.",
+ "Hop 1 in route 'route4' in routing table 'mytable' references 'hop69' which is neither a service, a route nor another hop.",
+ "Hop 1 in route 'route5' in routing table 'mytable' references route 'route69' which does not exist.",
+ "Route 'route1' in routing table 'mytable' is defined 2 times.",
+ "Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'hop5' in routing table 'mytable'."));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private static void assertVerifyOk(RoutingSpec routing, ApplicationSpec app) {
+ assertVerifyFail(routing, app, new ArrayList<String>());
+ }
+
+ private static void assertVerifyFail(RoutingSpec routing, ApplicationSpec app, List<String> expectedErrors) {
+ List<String> errors = new ArrayList<>();
+ routing.verify(app, errors);
+
+ Collections.sort(errors);
+ Collections.sort(expectedErrors);
+ assertEquals(expectedErrors.toString(), errors.toString());
+ }
+
+ private static void assertConfig(RoutingSpec routing) {
+ assertEquals(routing, routing);
+ assertEquals(routing, new RoutingSpec(routing));
+
+ ConfigStore store = new ConfigStore();
+ ConfigAgent subscriber = new ConfigAgent("raw:" + routing.toString(), store);
+ subscriber.subscribe();
+ assertTrue(store.routing.equals(routing));
+ }
+
+ private static void assertApplicationSpec(List<String> services, List<String> patterns) {
+ ApplicationSpec app = new ApplicationSpec();
+ for (String pattern : patterns) {
+ assertFalse(app.isService("foo", pattern));
+ assertFalse(app.isService("bar", pattern));
+ }
+ for (String service : services) {
+ app.addService("foo", service);
+ }
+ for (String pattern : patterns) {
+ assertTrue(app.isService("foo", pattern));
+ assertFalse(app.isService("bar", pattern));
+ }
+ for (String service : services) {
+ app.addService("bar", service);
+ }
+ for (String pattern : patterns) {
+ assertTrue(app.isService("foo", pattern));
+ assertTrue(app.isService("bar", pattern));
+ }
+ }
+
+ private static class ConfigStore implements ConfigHandler {
+
+ RoutingSpec routing = null;
+
+ public void setupRouting(RoutingSpec routing) {
+ this.routing = routing;
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingTestCase.java new file mode 100644 index 00000000000..911350d3cc9 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingTestCase.java @@ -0,0 +1,1144 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.routing; + +import com.yahoo.component.Vtag; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.test.CustomPolicy; +import com.yahoo.messagebus.routing.test.CustomPolicyFactory; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a> + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RoutingTestCase { + + //////////////////////////////////////////////////////////////////////////////// + // + // Setup + // + //////////////////////////////////////////////////////////////////////////////// + + Slobrok slobrok; + TestServer srcServer, dstServer; + SourceSession srcSession; + DestinationSession dstSession; + RetryTransientErrorsPolicy retryPolicy; + + @Before + public void setUp() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId( + TestServer.getSlobrokConfig(slobrok))); + dstSession = dstServer.mb.createDestinationSession( + new DestinationSessionParams().setName("session").setMessageHandler(new Receptor())); + retryPolicy = new RetryTransientErrorsPolicy(); + retryPolicy.setBaseDelay(0); + srcServer = new TestServer(new MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + srcSession = srcServer.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor())); + assertTrue(srcServer.waitSlobrok("dst/session", 1)); + } + + @After + public void tearDown() { + slobrok.stop(); + dstSession.destroy(); + dstServer.destroy(); + srcSession.destroy(); + srcServer.destroy(); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Tests + // + //////////////////////////////////////////////////////////////////////////////// + + @Test + public void requireThatNullRouteIsCaught() { + assertTrue(srcSession.send(createMessage("msg")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode()); + } + + @Test + public void requireThatEmptyRouteIsCaught() { + assertTrue(srcSession.send(createMessage("msg"), new Route()).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode()); + } + + @Test + public void requireThatHopNameIsExpanded() { + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addHop(new HopSpec("dst", "dst/session"))); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + + @Test + public void requireThatRouteDirectiveWorks() { + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addRoute(new RouteSpec("dst").addHop("dst/session")) + .addHop(new HopSpec("dir", "route:dst"))); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dir")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + + @Test + public void requireThatRouteNameIsExpanded() { + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addRoute(new RouteSpec("dst").addHop("dst/session"))); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + + @Test + public void requireThatHopResolutionOverflowIsCaught() { + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addHop(new HopSpec("foo", "bar")) + .addHop(new HopSpec("bar", "foo"))); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("foo")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode()); + } + + @Test + public void requireThatRouteResolutionOverflowIsCaught() { + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addRoute(new RouteSpec("foo").addHop("route:foo"))); + assertTrue(srcSession.send(createMessage("msg"), "foo").isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode()); + } + + @Test + public void requireThatRouteExpansionOnlyReplacesFirstHop() { + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addRoute(new RouteSpec("foo").addHop("dst/session").addHop("bar"))); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("route:foo baz")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + assertEquals(2, msg.getRoute().getNumHops()); + assertEquals("bar", msg.getRoute().getHop(0).toString()); + assertEquals("baz", msg.getRoute().getHop(1).toString()); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + + @Test + public void requireThatErrorDirectiveWorks() { + Route route = Route.parse("foo/bar/baz"); + route.getHop(0).setDirective(1, new ErrorDirective("err")); + assertTrue(srcSession.send(createMessage("msg"), route).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode()); + assertEquals("err", reply.getError(0).getMessage()); + } + + @Test + public void requireThatIllegalSelectIsCaught() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory()); + srcServer.mb.putProtocol(protocol); + Route route = Route.parse("[Custom: ]"); + assertNotNull(route); + assertTrue(srcSession.send(createMessage("msg"), route).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.NO_SERVICES_FOR_ROUTE, reply.getError(0).getCode()); + } + + @Test + public void requireThatEmptySelectIsCaught() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory()); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom]")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.NO_SERVICES_FOR_ROUTE, reply.getError(0).getCode()); + } + + @Test + public void requireThatPolicySelectWorks() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory()); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + + @Test + public void requireThatTransientErrorsAreRetried() { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err1")); + dstSession.reply(reply); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err2")); + dstSession.reply(reply); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + dstSession.acknowledge(msg); + assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60)); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + assertTrace(Arrays.asList("[APP_TRANSIENT_ERROR @ localhost]: err1", + "-[APP_TRANSIENT_ERROR @ localhost]: err1", + "[APP_TRANSIENT_ERROR @ localhost]: err2", + "-[APP_TRANSIENT_ERROR @ localhost]: err2"), + reply.getTrace()); + } + + @Test + public void requireThatTransientErrorsAreRetriedWithPolicy() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory()); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err1")); + dstSession.reply(reply); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err2")); + dstSession.reply(reply); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + dstSession.acknowledge(msg); + assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60)); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + assertTrace(Arrays.asList("Source session accepted a 3 byte message. 1 message(s) now pending.", + "Running routing policy 'Custom'.", + "Selecting [dst/session].", + "Component 'dst/session' selected by policy 'Custom'.", + "Resolving 'dst/session'.", + "Sending message (version ${VERSION}) from client to 'dst/session'", + "Message (type 1) received at 'dst' for session 'session'.", + "[APP_TRANSIENT_ERROR @ localhost]: err1", + "Sending reply (version ${VERSION}) from 'dst'.", + "Reply (type 0) received at client.", + "Routing policy 'Custom' merging replies.", + "Merged [dst/session].", + "Message scheduled for retry 1 in 0.0 seconds.", + "Resender resending message.", + "Running routing policy 'Custom'.", + "Selecting [dst/session].", + "Component 'dst/session' selected by policy 'Custom'.", + "Resolving 'dst/session'.", + "Sending message (version ${VERSION}) from client to 'dst/session'", + "Message (type 1) received at 'dst' for session 'session'.", + "[APP_TRANSIENT_ERROR @ localhost]: err2", + "Sending reply (version ${VERSION}) from 'dst'.", + "Reply (type 0) received at client.", + "Routing policy 'Custom' merging replies.", + "Merged [dst/session].", + "Message scheduled for retry 2 in 0.0 seconds.", + "Resender resending message.", + "Running routing policy 'Custom'.", + "Selecting [dst/session].", + "Component 'dst/session' selected by policy 'Custom'.", + "Resolving 'dst/session'.", + "Sending message (version ${VERSION}) from client to 'dst/session'", + "Message (type 1) received at 'dst' for session 'session'.", + "Sending reply (version ${VERSION}) from 'dst'.", + "Reply (type 0) received at client.", + "Routing policy 'Custom' merging replies.", + "Merged [dst/session].", + "Source session received reply. 0 message(s) now pending."), + reply.getTrace()); + } + + @Test + public void requireThatRetryCanBeDisabled() { + retryPolicy.setEnabled(false); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err")); + dstSession.reply(reply); + assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60)); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.APP_TRANSIENT_ERROR, reply.getError(0).getCode()); + } + + @Test + public void requireThatRetryCallsSelect() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory()); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err")); + dstSession.reply(reply); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + dstSession.acknowledge(msg); + assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60)); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + assertTrace(Arrays.asList("Selecting [dst/session].", + "[APP_TRANSIENT_ERROR @ localhost]", + "-[APP_TRANSIENT_ERROR @ localhost]", + "Merged [dst/session].", + "Selecting [dst/session].", + "Sending reply", + "Merged [dst/session]."), + reply.getTrace()); + } + + @Test + public void requireThatPolicyCanDisableReselectOnRetry() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory(false)); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err")); + dstSession.reply(reply); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + dstSession.acknowledge(msg); + assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60)); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + assertTrace(Arrays.asList("Selecting [dst/session].", + "[APP_TRANSIENT_ERROR @ localhost]", + "-[APP_TRANSIENT_ERROR @ localhost]", + "Merged [dst/session].", + "-Selecting [dst/session].", + "Sending reply", + "Merged [dst/session]."), + reply.getTrace()); + } + + @Test + public void requireThatPolicyCanConsumeErrors() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory(true, ErrorCode.NO_ADDRESS_FOR_SERVICE)); + srcServer.mb.putProtocol(protocol); + retryPolicy.setEnabled(false); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session,dst/unknown]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(0).getCode()); + assertTrace(Arrays.asList("Selecting [dst/session, dst/unknown].", + "[NO_ADDRESS_FOR_SERVICE @ localhost]", + "Sending reply", + "Merged [dst/session, dst/unknown]."), + reply.getTrace()); + } + + @Test + public void requireThatPolicyOnlyConsumesDeclaredErrors() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory()); + srcServer.mb.putProtocol(protocol); + retryPolicy.setEnabled(false); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/unknown]")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(0).getCode()); + assertTrace(Arrays.asList("Selecting [dst/unknown].", + "[NO_ADDRESS_FOR_SERVICE @ localhost]", + "Merged [dst/unknown]."), + reply.getTrace()); + } + + @Test + public void requireThatPolicyCanExpandToPolicy() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory(true, ErrorCode.NO_ADDRESS_FOR_SERVICE)); + srcServer.mb.putProtocol(protocol); + retryPolicy.setEnabled(false); + assertTrue(srcSession.send(createMessage("msg"), + Route.parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(0).getCode()); + } + + @Test + public void requireThatReplyCanBeRemovedFromChildNodes() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new RemoveReplyPolicy(true, + Arrays.asList(ErrorCode.NO_ADDRESS_FOR_SERVICE), + CustomPolicyFactory.parseRoutes(param), + 0); + } + }); + srcServer.mb.putProtocol(protocol); + retryPolicy.setEnabled(false); + assertTrue(srcSession.send(createMessage("msg"), + Route.parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + assertTrace(Arrays.asList("[NO_ADDRESS_FOR_SERVICE @ localhost]", + "-[NO_ADDRESS_FOR_SERVICE @ localhost]", + "Sending message", + "-Sending message"), + reply.getTrace()); + } + + @Test + public void requireThatSetReplyWorks() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Select", new CustomPolicyFactory(true, ErrorCode.APP_FATAL_ERROR)); + protocol.addPolicyFactory("SetReply", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new SetReplyPolicy(true, Arrays.asList(ErrorCode.APP_FATAL_ERROR), param); + } + }); + srcServer.mb.putProtocol(protocol); + retryPolicy.setEnabled(false); + assertTrue( + srcSession.send(createMessage("msg"), Route.parse("[Select:[SetReply:foo],dst/session]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.APP_FATAL_ERROR, reply.getError(0).getCode()); + assertEquals("foo", reply.getError(0).getMessage()); + } + + @Test + public void requireThatReplyCanBeReusedOnRetry() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("ReuseReply", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new ReuseReplyPolicy(false, + Arrays.asList(ErrorCode.APP_FATAL_ERROR), + CustomPolicyFactory.parseRoutes(param)); + } + }); + protocol.addPolicyFactory("SetReply", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new SetReplyPolicy(false, + Arrays.asList(ErrorCode.APP_FATAL_ERROR), + param); + } + }); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), + Route.parse("[ReuseReply:[SetReply:foo],dst/session]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "dst")); + dstSession.reply(reply); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + dstSession.acknowledge(msg); + assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60)); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + } + + @Test + public void requireThatReplyCanBeRemovedAndRetried() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("RemoveReply", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new RemoveReplyPolicy(false, + Arrays.asList(ErrorCode.APP_TRANSIENT_ERROR), + CustomPolicyFactory.parseRoutes(param), + 0); + } + }); + protocol.addPolicyFactory("SetReply", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new SetReplyPolicy(false, + Arrays.asList(ErrorCode.APP_TRANSIENT_ERROR, ErrorCode.APP_FATAL_ERROR), + param); + } + }); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession + .send(createMessage("msg"), Route.parse("[RemoveReply:[SetReply:foo],dst/session]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.APP_FATAL_ERROR, reply.getError(0).getCode()); + assertEquals("foo", reply.getError(0).getMessage()); + assertTrace(Arrays.asList("Resolving '[SetReply:foo]'.", + "Resolving 'dst/session'.", + "Resender resending message.", + "Resolving 'dst/session'.", + "Resolving '[SetReply:foo]'."), + reply.getTrace()); + } + + @Test + public void requireThatIgnoreResultWorks() { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("?dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "dst")); + dstSession.reply(reply); + assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60)); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + assertTrace(Arrays.asList("Not waiting for a reply from 'dst/session'."), + reply.getTrace()); + } + + @Test + public void requireThatIgnoreResultCanBeSetInHopBlueprint() { + srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME) + .addHop(new HopSpec("foo", "dst/session").setIgnoreResult(true))); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("foo")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "dst")); + dstSession.reply(reply); + assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60)); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + assertTrace(Arrays.asList("Not waiting for a reply from 'dst/session'."), + reply.getTrace()); + } + + @Test + public void requireThatIgnoreFlagPersistsThroughHopLookup() { + setupRouting(new RoutingTableSpec(SimpleProtocol.NAME).addHop(new HopSpec("foo", "dst/unknown"))); + assertSend("?foo"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatIgnoreFlagPersistsThroughRouteLookup() { + setupRouting(new RoutingTableSpec(SimpleProtocol.NAME).addRoute(new RouteSpec("foo").addHop("dst/unknown"))); + assertSend("?foo"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatIgnoreFlagPersistsThroughPolicySelect() { + setupPolicy("Custom", MyPolicy.newSelectAndMerge("dst/unknown")); + assertSend("?[Custom]"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatIgnoreFlagIsSerializedWithMessage() { + assertSend("dst/session foo ?bar"); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + Route route = msg.getRoute(); + assertEquals(2, route.getNumHops()); + Hop hop = route.getHop(0); + assertEquals("foo", hop.toString()); + assertFalse(hop.getIgnoreResult()); + hop = route.getHop(1); + assertEquals("?bar", hop.toString()); + assertTrue(hop.getIgnoreResult()); + dstSession.acknowledge(msg); + assertTrace("-Ignoring errors in reply."); + } + + @Test + public void requireThatIgnoreFlagDoesNotInterfere() { + setupPolicy("Custom", MyPolicy.newSelectAndMerge("dst/session")); + assertSend("?[Custom]"); + assertTrace("-Ignoring errors in reply."); + } + + @Test + public void requireThatEmptySelectionCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newEmptySelection()); + assertSend("?[Custom]"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatSelectErrorCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newSelectError(ErrorCode.APP_FATAL_ERROR, "foo")); + assertSend("?[Custom]"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatSelectExceptionCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newSelectException(new RuntimeException())); + assertSend("?[Custom]"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatSelectAndThrowCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newSelectAndThrow("dst/session", new RuntimeException())); + assertSend("?[Custom]"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatEmptyMergeCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newEmptyMerge("dst/session")); + assertSend("?[Custom]"); + assertAcknowledge(); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatMergeErrorCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newMergeError("dst/session", ErrorCode.APP_FATAL_ERROR, "foo")); + assertSend("?[Custom]"); + assertAcknowledge(); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatMergeExceptionCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newMergeException("dst/session", new RuntimeException())); + assertSend("?[Custom]"); + assertAcknowledge(); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatMergeAndThrowCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newMergeAndThrow("dst/session", new RuntimeException())); + assertSend("?[Custom]"); + assertAcknowledge(); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatAllocServiceAddressCanBeIgnored() { + assertSend("?dst/unknown"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatDepthLimitCanBeIgnored() { + setupPolicy("Custom", MyPolicy.newSelectAndMerge("[Custom]")); + assertSend("?[Custom]"); + assertTrace("Ignoring errors in reply."); + } + + @Test + public void requireThatRouteCanBeEmptyInDestination() { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + assertNull(msg.getRoute()); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + } + + @Test + public void requireThatOnlyActiveNodesAreAborted() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new CustomPolicyFactory(false)); + protocol.addPolicyFactory("SetReply", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new SetReplyPolicy(false, + Arrays.asList(ErrorCode.APP_TRANSIENT_ERROR, + ErrorCode.APP_TRANSIENT_ERROR, + ErrorCode.APP_FATAL_ERROR), + param); + } + }); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), + Route.parse("[Custom:[SetReply:foo],?bar,dst/session]")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(2, reply.getNumErrors()); + assertEquals(ErrorCode.APP_FATAL_ERROR, reply.getError(0).getCode()); + assertEquals(ErrorCode.SEND_ABORTED, reply.getError(1).getCode()); + } + + @Test + public void requireThatTimeoutWorks() { + retryPolicy.setBaseDelay(0.01); + srcSession.setTimeout(0.5); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/unknown")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(2, reply.getNumErrors()); + assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(0).getCode()); + assertEquals(ErrorCode.TIMEOUT, reply.getError(1).getCode()); + } + + @Test + public void requireThatUnknownPolicyIsCaught() { + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Unknown]")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.UNKNOWN_POLICY, reply.getError(0).getCode()); + } + + private SimpleProtocol.PolicyFactory exceptionOnSelectThrowingMockFactory() { + return new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new RoutingPolicy() { + + @Override + public void select(RoutingContext context) { + throw new RuntimeException("69"); + } + + @Override + public void merge(RoutingContext context) { + } + + @Override + public void destroy() { + } + }; + } + }; + } + + @Test + public void requireThatSelectExceptionIsCaught() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", exceptionOnSelectThrowingMockFactory()); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom]")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.POLICY_ERROR, reply.getError(0).getCode()); + assertTrue(reply.getError(0).getMessage().contains("69")); + } + + @Test + public void selectExceptionIncludesStackTraceInMessage() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", exceptionOnSelectThrowingMockFactory()); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom]")).isAccepted()); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertEquals(ErrorCode.POLICY_ERROR, reply.getError(0).getCode()); + // Attempting any sort of full matching of the stack trace is brittle, so + // simplify by assuming any message which mentions the source file of the + // originating exception is good to go. + assertTrue(reply.getError(0).getMessage().contains("RoutingTestCase")); + } + + @Test + public void requireThatMergeExceptionIsCaught() { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory("Custom", new SimpleProtocol.PolicyFactory() { + + @Override + public RoutingPolicy create(String param) { + return new RoutingPolicy() { + + @Override + public void select(RoutingContext context) { + context.addChild(Route.parse("dst/session")); + } + + @Override + public void merge(RoutingContext context) { + throw new RuntimeException("69"); + } + + @Override + public void destroy() { + + } + }; + } + }); + srcServer.mb.putProtocol(protocol); + assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom]")).isAccepted()); + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertEquals(1, reply.getNumErrors()); + assertEquals(ErrorCode.POLICY_ERROR, reply.getError(0).getCode()); + assertTrue(reply.getError(0).getMessage().contains("69")); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Utilities + // + //////////////////////////////////////////////////////////////////////////////// + + private static Message createMessage(String msg) { + SimpleMessage ret = new SimpleMessage(msg); + ret.getTrace().setLevel(9); + return ret; + } + + private void setupRouting(RoutingTableSpec spec) { + srcServer.setupRouting(spec); + } + + private void setupPolicy(String policyName, SimpleProtocol.PolicyFactory policyFactory) { + SimpleProtocol protocol = new SimpleProtocol(); + protocol.addPolicyFactory(policyName, policyFactory); + srcServer.mb.putProtocol(protocol); + } + + private void assertSend(String route) { + assertTrue(srcSession.send(createMessage("msg").setRoute(Route.parse(route))).isAccepted()); + } + + private void assertAcknowledge() { + Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60); + assertNotNull(msg); + dstSession.acknowledge(msg); + } + + private void assertTrace(String... expectedTrace) { + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + System.out.println(reply.getTrace()); + assertFalse(reply.hasErrors()); + assertTrace(Arrays.asList(expectedTrace), reply.getTrace()); + } + + public static void assertTrace(List<String> expected, Trace trace) { + String actual = trace.toString(); + for (int i = 0, pos = -1; i < expected.size(); ++i) { + String line = expected.get(i).replaceFirst("\\$\\{VERSION\\}", Vtag.currentVersion.toString()); + if (line.charAt(0) == '-') { + String str = line.substring(1); + assertTrue("Line " + i + " '" + str + "' not expected.", + actual.indexOf(str, pos + 1) < 0); + } else { + pos = actual.indexOf(line, pos + 1); + assertTrue("Line " + i + " '" + line + "' missing.", pos >= 0); + } + } + } + + private static class RemoveReplyPolicy extends CustomPolicy { + + final int idxRemove; + + public RemoveReplyPolicy(boolean selectOnRetry, List<Integer> consumableErrors, List<Route> routes, + int idxRemove) { + super(selectOnRetry, consumableErrors, routes); + this.idxRemove = idxRemove; + } + + public void merge(RoutingContext ctx) { + ctx.setReply(ctx.getChildIterator().skip(idxRemove).removeReply()); + } + + @Override + public void destroy() { + } + } + + private static class ReuseReplyPolicy extends CustomPolicy { + + final List<Integer> errorMask = new ArrayList<>(); + + public ReuseReplyPolicy(boolean selectOnRetry, List<Integer> errorMask, + List<Route> routes) { + super(selectOnRetry, errorMask, routes); + this.errorMask.addAll(errorMask); + } + + public void merge(RoutingContext ctx) { + Reply ret = new EmptyReply(); + int idx = 0; + int idxFirstOk = -1; + for (RoutingNodeIterator it = ctx.getChildIterator(); + it.isValid(); it.next(), ++idx) { + Reply ref = it.getReplyRef(); + if (!ref.hasErrors()) { + if (idxFirstOk < 0) { + idxFirstOk = idx; + } + } else { + for (int i = 0; i < ref.getNumErrors(); ++i) { + Error err = ref.getError(i); + if (!errorMask.contains(err.getCode())) { + ret.addError(err); + } + } + } + } + if (ret.hasErrors()) { + ctx.setReply(ret); + } else { + ctx.setReply(ctx.getChildIterator().skip(idxFirstOk).removeReply()); + } + } + + @Override + public void destroy() { + } + } + + private static class SetReplyPolicy implements RoutingPolicy { + + final boolean selectOnRetry; + final List<Integer> errors = new ArrayList<>(); + final String param; + int idx = 0; + + public SetReplyPolicy(boolean selectOnRetry, List<Integer> errors, String param) { + this.selectOnRetry = selectOnRetry; + this.errors.addAll(errors); + this.param = param; + } + + public void select(RoutingContext ctx) { + int idx = this.idx++; + int err = errors.get(idx < errors.size() ? idx : errors.size() - 1); + if (err != ErrorCode.NONE) { + ctx.setError(err, param); + } else { + ctx.setReply(new EmptyReply()); + } + ctx.setSelectOnRetry(selectOnRetry); + } + + public void merge(RoutingContext ctx) { + Reply reply = new EmptyReply(); + reply.addError(new Error(ErrorCode.FATAL_ERROR, + "Merge should not be called when select() sets a reply.")); + ctx.setReply(reply); + } + + public void destroy() { + } + } + + private static class MyPolicy implements SimpleProtocol.PolicyFactory { + + final Route selectRoute; + final Reply selectReply; + final Reply mergeReply; + final RuntimeException selectException; + final RuntimeException mergeException; + final boolean mergeFromChild; + + MyPolicy(Route selectRoute, Reply selectReply, RuntimeException selectException, + Reply mergeReply, RuntimeException mergeException, boolean mergeFromChild) { + this.selectRoute = selectRoute; + this.selectReply = selectReply; + this.selectException = selectException; + this.mergeReply = mergeReply; + this.mergeException = mergeException; + this.mergeFromChild = mergeFromChild; + } + + @Override + public RoutingPolicy create(String param) { + return new RoutingPolicy() { + + @Override + public void select(RoutingContext context) { + if (selectRoute != null) { + context.addChild(selectRoute); + } + if (selectReply != null) { + context.setReply(selectReply); + } + if (selectException != null) { + throw selectException; + } + } + + @Override + public void merge(RoutingContext context) { + if (mergeReply != null) { + context.setReply(mergeReply); + } else if (mergeFromChild) { + context.setReply(context.getChildIterator().removeReply()); + } + if (mergeException != null) { + throw mergeException; + } + } + + @Override + public void destroy() { + + } + }; + } + + static Reply newErrorReply(int errCode, String errMessage) { + Reply reply = new EmptyReply(); + reply.addError(new Error(errCode, errMessage)); + return reply; + } + + static MyPolicy newSelectAndMerge(String select) { + return new MyPolicy(Route.parse(select), null, null, null, null, true); + } + + static MyPolicy newEmptySelection() { + return new MyPolicy(null, null, null, null, null, false); + } + + static MyPolicy newSelectError(int errCode, String errMessage) { + return new MyPolicy(null, newErrorReply(errCode, errMessage), null, null, null, false); + } + + static MyPolicy newSelectException(RuntimeException e) { + return new MyPolicy(null, null, e, null, null, false); + } + + static MyPolicy newSelectAndThrow(String select, RuntimeException e) { + return new MyPolicy(Route.parse(select), null, e, null, null, false); + } + + static MyPolicy newEmptyMerge(String select) { + return new MyPolicy(Route.parse(select), null, null, null, null, false); + } + + static MyPolicy newMergeError(String select, int errCode, String errMessage) { + return new MyPolicy(Route.parse(select), null, null, newErrorReply(errCode, errMessage), null, false); + } + + static MyPolicy newMergeException(String select, RuntimeException e) { + return new MyPolicy(Route.parse(select), null, null, null, e, false); + } + + static MyPolicy newMergeAndThrow(String select, RuntimeException e) { + return new MyPolicy(Route.parse(select), null, null, null, e, true); + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/QueueAdapterTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/QueueAdapterTestCase.java new file mode 100644 index 00000000000..000505f6349 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/test/QueueAdapterTestCase.java @@ -0,0 +1,108 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.test; + +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.text.Utf8String; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class QueueAdapterTestCase { + + private static final int NO_WAIT = 0; + private static final int WAIT_FOREVER = 60; + + @Test + public void requireThatAccessorsWork() { + QueueAdapter queue = new QueueAdapter(); + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); + + Message msg = new MyMessage(); + queue.handleMessage(msg); + assertFalse(queue.isEmpty()); + assertEquals(1, queue.size()); + + MyReply reply = new MyReply(); + queue.handleReply(reply); + assertFalse(queue.isEmpty()); + assertEquals(2, queue.size()); + + assertSame(msg, queue.dequeue()); + assertSame(reply, queue.dequeue()); + } + + @Test + public void requireThatSizeCanBeWaitedFor() { + final QueueAdapter queue = new QueueAdapter(); + assertTrue(queue.waitSize(0, NO_WAIT)); + assertFalse(queue.waitSize(1, NO_WAIT)); + queue.handleMessage(new MyMessage()); + assertFalse(queue.waitSize(0, NO_WAIT)); + assertTrue(queue.waitSize(1, NO_WAIT)); + + Thread thread = new Thread() { + + @Override + public void run() { + try { + Thread.sleep(100); + queue.handleMessage(new MyMessage()); + } catch (InterruptedException e) { + + } + } + }; + thread.start(); + assertTrue(queue.waitSize(2, WAIT_FOREVER)); + } + + @Test + public void requireThatWaitCanBeInterrupted() throws InterruptedException { + final QueueAdapter queue = new QueueAdapter(); + final AtomicReference<Boolean> result = new AtomicReference<>(); + Thread thread = new Thread() { + + @Override + public void run() { + result.set(queue.waitSize(1, WAIT_FOREVER)); + } + }; + thread.start(); + thread.interrupt(); + thread.join(); + assertEquals(Boolean.FALSE, result.get()); + } + + private static class MyMessage extends Message { + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } + + private static class MyReply extends Reply { + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/ReceptorTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/ReceptorTestCase.java new file mode 100644 index 00000000000..a34f37b0196 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/test/ReceptorTestCase.java @@ -0,0 +1,143 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.test; + +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.text.Utf8String; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class ReceptorTestCase { + + @Test + public void requireThatAccessorsWork() { + Receptor receptor = new Receptor(); + assertNull(receptor.getMessage(0)); + Message msg = new MyMessage(); + receptor.handleMessage(msg); + assertSame(msg, receptor.getMessage(0)); + + Reply reply = new MyReply(); + receptor.handleReply(reply); + assertSame(reply, receptor.getReply(0)); + } + + @Test + public void requireThatMessagesAndRepliesAreTrackedIndividually() { + Receptor receptor = new Receptor(); + receptor.handleMessage(new MyMessage()); + receptor.handleReply(new MyReply()); + assertNotNull(receptor.getMessage(0)); + assertNotNull(receptor.getReply(0)); + + receptor.handleMessage(new MyMessage()); + receptor.handleReply(new MyReply()); + assertNotNull(receptor.getReply(0)); + assertNotNull(receptor.getMessage(0)); + } + + @Test + public void requireThatMessagesCanBeWaitedFor() { + final Receptor receptor = new Receptor(); + Thread thread = new Thread() { + + @Override + public void run() { + try { + Thread.sleep(100); + receptor.handleMessage(new MyMessage()); + } catch (InterruptedException e) { + + } + } + }; + thread.start(); + assertNotNull(receptor.getMessage(60)); + } + + @Test + public void requireThatMessageWaitCanBeInterrupted() throws InterruptedException { + final Receptor receptor = new Receptor(); + final CountDownLatch latch = new CountDownLatch(1); + Thread thread = new Thread() { + + @Override + public void run() { + receptor.getMessage(60); + latch.countDown(); + } + }; + thread.start(); + thread.interrupt(); + assertTrue(latch.await(30, TimeUnit.SECONDS)); + } + + @Test + public void requireThatRepliesCanBeWaitedFor() { + final Receptor receptor = new Receptor(); + Thread thread = new Thread() { + + @Override + public void run() { + try { + Thread.sleep(100); + receptor.handleReply(new MyReply()); + } catch (InterruptedException e) { + + } + } + }; + thread.start(); + assertNotNull(receptor.getReply(60)); + } + + @Test + public void requireThatReplyWaitCanBeInterrupted() throws InterruptedException { + final Receptor receptor = new Receptor(); + final CountDownLatch latch = new CountDownLatch(1); + Thread thread = new Thread() { + + @Override + public void run() { + receptor.getReply(60); + latch.countDown(); + } + }; + thread.start(); + thread.interrupt(); + assertTrue(latch.await(30, TimeUnit.SECONDS)); + } + + private static class MyMessage extends Message { + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } + + private static class MyReply extends Reply { + + @Override + public Utf8String getProtocol() { + return null; + } + + @Override + public int getType() { + return 0; + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleMessageTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleMessageTestCase.java new file mode 100644 index 00000000000..ead0fdb88b4 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleMessageTestCase.java @@ -0,0 +1,25 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.test; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class SimpleMessageTestCase { + + @Test + public void requireThatAccessorsWork() { + SimpleMessage msg = new SimpleMessage("foo"); + assertEquals(SimpleProtocol.MESSAGE, msg.getType()); + assertEquals(SimpleProtocol.NAME, msg.getProtocol()); + assertEquals(3, msg.getApproxSize()); + assertEquals("foo", msg.getValue()); + msg.setValue("bar"); + assertEquals("bar", msg.getValue()); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleProtocolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleProtocolTestCase.java new file mode 100644 index 00000000000..ce42762f235 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleProtocolTestCase.java @@ -0,0 +1,65 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.test; + +import com.yahoo.component.Version; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Routable; +import org.junit.Test; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class SimpleProtocolTestCase { + + private static final Version VERSION = new Version(1); + private static final SimpleProtocol PROTOCOL = new SimpleProtocol(); + + @Test + public void requireThatNameIsSet() { + assertEquals(SimpleProtocol.NAME, PROTOCOL.getName()); + } + + @Test + public void requireThatMetricSetIsNull() { + assertNull(PROTOCOL.getMetrics()); + } + + @Test + public void requireThatMessageCanBeEncodedAndDecoded() { + SimpleMessage msg = new SimpleMessage("foo"); + byte[] buf = PROTOCOL.encode(Version.emptyVersion, msg); + Routable routable = PROTOCOL.decode(Version.emptyVersion, buf); + assertNotNull(routable); + assertEquals(SimpleMessage.class, routable.getClass()); + msg = (SimpleMessage)routable; + assertEquals("foo", msg.getValue()); + } + + @Test + public void requireThatReplyCanBeDecoded() { + SimpleReply reply = new SimpleReply("foo"); + byte[] buf = PROTOCOL.encode(Version.emptyVersion, reply); + Routable routable = PROTOCOL.decode(Version.emptyVersion, buf); + assertNotNull(routable); + assertEquals(SimpleReply.class, routable.getClass()); + reply = (SimpleReply)routable; + assertEquals("foo", reply.getValue()); + } + + @Test + public void requireThatUnknownRoutablesAreNotEncoded() { + assertNull(PROTOCOL.encode(VERSION, new EmptyReply())); + } + + @Test + public void requireThatEmptyBufferIsNotDecoded() { + assertNull(PROTOCOL.decode(VERSION, new byte[0])); + } + + @Test + public void requireThatUnknownBufferIsNotDecoded() { + assertNull(PROTOCOL.decode(VERSION, new byte[] { 'U' })); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleReplyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleReplyTestCase.java new file mode 100644 index 00000000000..474fea14ac7 --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleReplyTestCase.java @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus.test; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class SimpleReplyTestCase { + + @Test + public void requireThatAccessorsWork() { + SimpleReply reply = new SimpleReply("foo"); + assertEquals(SimpleProtocol.REPLY, reply.getType()); + assertEquals(SimpleProtocol.NAME, reply.getProtocol()); + assertEquals("foo", reply.getValue()); + reply.setValue("bar"); + assertEquals("bar", reply.getValue()); + } +} |