summaryrefslogtreecommitdiffstats
path: root/messagebus/src/test
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /messagebus/src/test
Publish
Diffstat (limited to 'messagebus/src/test')
-rw-r--r--messagebus/src/test/files/.gitignore1
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java169
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/ConfigAgentTestCase.java199
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java17
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java92
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java144
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/MessengerTestCase.java124
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/ProtocolRepositoryTestCase.java103
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java39
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java114
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java169
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java179
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java53
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java230
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/TimeoutTestCase.java102
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/TraceTestCase.java286
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java116
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/IdentityTestCase.java28
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java132
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java152
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java96
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java200
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/rpc/RPCNetworkTestCase.java100
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java157
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java90
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java57
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java151
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java112
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/routing/AdvancedRoutingTestCase.java119
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/routing/ResenderTestCase.java200
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java32
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/routing/RouteParserTestCase.java168
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/routing/RoutingContextTestCase.java258
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java336
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingTestCase.java1144
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/test/QueueAdapterTestCase.java108
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/test/ReceptorTestCase.java143
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/test/SimpleMessageTestCase.java25
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/test/SimpleProtocolTestCase.java65
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/test/SimpleReplyTestCase.java22
40 files changed, 6032 insertions, 0 deletions
diff --git a/messagebus/src/test/files/.gitignore b/messagebus/src/test/files/.gitignore
new file mode 100644
index 00000000000..b1f2d513ab4
--- /dev/null
+++ b/messagebus/src/test/files/.gitignore
@@ -0,0 +1 @@
+test.cfg
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java
new file mode 100755
index 00000000000..cde801d81f2
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/ChokeTestCase.java
@@ -0,0 +1,169 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import junit.framework.TestCase;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ChokeTestCase extends TestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer srcServer, dstServer;
+ SourceSession srcSession;
+ DestinationSession dstSession;
+
+ @Override
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ srcServer = new TestServer(new MessageBusParams().setRetryPolicy(null).addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public void testMaxCount() {
+ int max = 10;
+ dstServer.mb.setMaxPendingCount(max);
+ List<Message> lst = new ArrayList<>();
+ for (int i = 0; i < max * 2; ++i) {
+ if (i < max) {
+ assertEquals(i, dstServer.mb.getPendingCount());
+ } else {
+ assertEquals(max, dstServer.mb.getPendingCount());
+ }
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ if (i < max) {
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ lst.add(msg);
+ } else {
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode());
+ }
+ }
+ for (int i = 0; i < 5; ++i) {
+ Message msg = lst.remove(0);
+ dstSession.acknowledge(msg);
+
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+ assertNotNull(msg = reply.getMessage());
+ assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted());
+
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ lst.add(msg);
+ }
+ while (!lst.isEmpty()) {
+ assertEquals(lst.size(), dstServer.mb.getPendingCount());
+ Message msg = lst.remove(0);
+ dstSession.acknowledge(msg);
+
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+ }
+ assertEquals(0, dstServer.mb.getPendingCount());
+ }
+
+ public void testMaxSize() {
+ int size = createMessage("msg").getApproxSize();
+ int max = size * 10;
+ dstServer.mb.setMaxPendingSize(max);
+ List<Message> lst = new ArrayList<>();
+ for (int i = 0; i < max * 2; i += size) {
+ if (i < max) {
+ assertEquals(i, dstServer.mb.getPendingSize());
+ } else {
+ assertEquals(max, dstServer.mb.getPendingSize());
+ }
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ if (i < max) {
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ lst.add(msg);
+ } else {
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.SESSION_BUSY, reply.getError(0).getCode());
+ }
+ }
+ for (int i = 0; i < 5; ++i) {
+ Message msg = lst.remove(0);
+ dstSession.acknowledge(msg);
+
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+ assertNotNull(msg = reply.getMessage());
+ assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted());
+
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ lst.add(msg);
+ }
+ while (!lst.isEmpty()) {
+ assertEquals(size * lst.size(), dstServer.mb.getPendingSize());
+ Message msg = lst.remove(0);
+ dstSession.acknowledge(msg);
+
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+ }
+ assertEquals(0, dstServer.mb.getPendingSize());
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private static Message createMessage(String msg) {
+ Message ret = new SimpleMessage(msg);
+ ret.getTrace().setLevel(9);
+ return ret;
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ConfigAgentTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ConfigAgentTestCase.java
new file mode 100755
index 00000000000..f6b21e26030
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/ConfigAgentTestCase.java
@@ -0,0 +1,199 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.config.subscription.ConfigSet;
+import com.yahoo.config.subscription.ConfigURI;
+import com.yahoo.messagebus.routing.HopSpec;
+import com.yahoo.messagebus.routing.RouteSpec;
+import com.yahoo.messagebus.routing.RoutingSpec;
+import com.yahoo.messagebus.routing.RoutingTableSpec;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ConfigAgentTestCase {
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testRoutingConfig() throws InterruptedException, IOException {
+ LocalHandler handler = new LocalHandler();
+ assertFalse(testHalf(handler.spec));
+ assertFalse(testFull(handler.spec));
+
+ ConfigSet set = new ConfigSet();
+ set.addBuilder("test", writeFull());
+
+ ConfigAgent agent = new ConfigAgent(ConfigURI.createFromIdAndSource("test", set), handler);
+ assertFalse(testHalf(handler.spec));
+ assertFalse(testFull(handler.spec));
+ agent.subscribe();
+ assertFalse(testHalf(handler.spec));
+ assertTrue(testFull(handler.spec));
+
+ handler.reset();
+ set.addBuilder("test", writeHalf());
+ assertTrue(handler.await(120, TimeUnit.SECONDS));
+ assertTrue(testHalf(handler.spec));
+ assertFalse(testFull(handler.spec));
+
+ handler.reset();
+ set.addBuilder("test", writeFull());
+ assertTrue(handler.await(120, TimeUnit.SECONDS));
+ assertTrue(testFull(handler.spec));
+ assertFalse(testHalf(handler.spec));
+ }
+
+ private boolean testHalf(RoutingSpec spec) {
+ if (spec.getNumTables() != 1) {
+ return false;
+ }
+ assertTables(spec, 1);
+ return true;
+ }
+
+ private boolean testFull(RoutingSpec spec) {
+ if (spec.getNumTables() != 2) {
+ return false;
+ }
+ assertTables(spec, 2);
+ return true;
+ }
+
+ private void assertTables(RoutingSpec spec, int numTables) {
+ assertEquals(numTables, spec.getNumTables());
+ if (numTables > 0) {
+ assertEquals("foo", spec.getTable(0).getProtocol());
+ assertEquals(2, spec.getTable(0).getNumHops());
+ assertEquals("foo-h1", spec.getTable(0).getHop(0).getName());
+ assertEquals("foo-h1-sel", spec.getTable(0).getHop(0).getSelector());
+ assertEquals(2, spec.getTable(0).getHop(0).getNumRecipients());
+ assertEquals("foo-h1-r1", spec.getTable(0).getHop(0).getRecipient(0));
+ assertEquals("foo-h1-r2", spec.getTable(0).getHop(0).getRecipient(1));
+ assertEquals(true, spec.getTable(0).getHop(0).getIgnoreResult());
+ assertEquals("foo-h2", spec.getTable(0).getHop(1).getName());
+ assertEquals("foo-h2-sel", spec.getTable(0).getHop(1).getSelector());
+ assertEquals(2, spec.getTable(0).getHop(1).getNumRecipients());
+ assertEquals("foo-h2-r1", spec.getTable(0).getHop(1).getRecipient(0));
+ assertEquals("foo-h2-r2", spec.getTable(0).getHop(1).getRecipient(1));
+ assertEquals(2, spec.getTable(0).getNumRoutes());
+ assertEquals("foo-r1", spec.getTable(0).getRoute(0).getName());
+ assertEquals(2, spec.getTable(0).getRoute(0).getNumHops());
+ assertEquals("foo-h1", spec.getTable(0).getRoute(0).getHop(0));
+ assertEquals("foo-h2", spec.getTable(0).getRoute(0).getHop(1));
+ assertEquals("foo-r2", spec.getTable(0).getRoute(1).getName());
+ assertEquals(2, spec.getTable(0).getRoute(1).getNumHops());
+ assertEquals("foo-h2", spec.getTable(0).getRoute(1).getHop(0));
+ assertEquals("foo-h1", spec.getTable(0).getRoute(1).getHop(1));
+ }
+ if (numTables > 1) {
+ assertEquals("bar", spec.getTable(1).getProtocol());
+ assertEquals(2, spec.getTable(1).getNumHops());
+ assertEquals("bar-h1", spec.getTable(1).getHop(0).getName());
+ assertEquals("bar-h1-sel", spec.getTable(1).getHop(0).getSelector());
+ assertEquals(2, spec.getTable(1).getHop(0).getNumRecipients());
+ assertEquals("bar-h1-r1", spec.getTable(1).getHop(0).getRecipient(0));
+ assertEquals("bar-h1-r2", spec.getTable(1).getHop(0).getRecipient(1));
+ assertEquals("bar-h2", spec.getTable(1).getHop(1).getName());
+ assertEquals("bar-h2-sel", spec.getTable(1).getHop(1).getSelector());
+ assertEquals(2, spec.getTable(1).getHop(1).getNumRecipients());
+ assertEquals("bar-h2-r1", spec.getTable(1).getHop(1).getRecipient(0));
+ assertEquals("bar-h2-r2", spec.getTable(1).getHop(1).getRecipient(1));
+ assertEquals(2, spec.getTable(1).getNumRoutes());
+ assertEquals("bar-r1", spec.getTable(1).getRoute(0).getName());
+ assertEquals(2, spec.getTable(1).getRoute(0).getNumHops());
+ assertEquals("bar-h1", spec.getTable(1).getRoute(0).getHop(0));
+ assertEquals("bar-h2", spec.getTable(1).getRoute(0).getHop(1));
+ assertEquals("bar-r2", spec.getTable(1).getRoute(1).getName());
+ assertEquals(2, spec.getTable(1).getRoute(1).getNumHops());
+ assertEquals("bar-h2", spec.getTable(1).getRoute(1).getHop(0));
+ assertEquals("bar-h1", spec.getTable(1).getRoute(1).getHop(1));
+ }
+ }
+
+ private static MessagebusConfig.Builder writeHalf() {
+ return writeTables(1);
+ }
+
+ private static MessagebusConfig.Builder writeFull() {
+ return writeTables(2);
+ }
+
+ private static MessagebusConfig.Builder writeTables(int numTables) {
+ MessagebusConfig.Builder builder = new MessagebusConfig.Builder();
+ if (numTables > 0) {
+ MessagebusConfig.Routingtable.Builder table = new MessagebusConfig.Routingtable.Builder();
+ table.protocol("foo");
+ table.hop(getHop("foo-h1", "foo-h1-sel", "foo-h1-r1", "foo-h1-r2", true));
+ table.hop(getHop("foo-h2", "foo-h2-sel", "foo-h2-r1", "foo-h2-r2", false));
+ table.route(getRoute("foo-r1", "foo-h1", "foo-h2"));
+ table.route(getRoute("foo-r2", "foo-h2", "foo-h1"));
+ builder.routingtable(table);
+ }
+ if (numTables > 1) {
+ MessagebusConfig.Routingtable.Builder table = new MessagebusConfig.Routingtable.Builder();
+ table.protocol("bar");
+ table.hop(getHop("bar-h1", "bar-h1-sel", "bar-h1-r1", "bar-h1-r2", false));
+ table.hop(getHop("bar-h2", "bar-h2-sel", "bar-h2-r1", "bar-h2-r2", false));
+ table.route(getRoute("bar-r1", "bar-h1", "bar-h2"));
+ table.route(getRoute("bar-r2", "bar-h2", "bar-h1"));
+ builder.routingtable(table);
+ }
+ return builder;
+ }
+
+ private static MessagebusConfig.Routingtable.Route.Builder getRoute(String name, String hop1, String hop2) {
+ MessagebusConfig.Routingtable.Route.Builder route = new MessagebusConfig.Routingtable.Route.Builder();
+ route.name(name);
+ route.hop(hop1);
+ route.hop(hop2);
+ return route;
+ }
+
+ private static MessagebusConfig.Routingtable.Hop.Builder getHop(String name, String selector, String recipient1, String recipient2, boolean ignoreresult) {
+ MessagebusConfig.Routingtable.Hop.Builder hop = new MessagebusConfig.Routingtable.Hop.Builder();
+ hop.name(name);
+ hop.selector(selector);
+ hop.recipient(recipient1);
+ hop.recipient(recipient2);
+ hop.ignoreresult(ignoreresult);
+ return hop;
+ }
+
+ private static class LocalHandler implements ConfigHandler {
+
+ volatile RoutingSpec spec = new RoutingSpec();
+
+ public void setupRouting(RoutingSpec spec) {
+ this.spec = spec;
+ }
+
+ public void reset() {
+ spec = null;
+ }
+
+ public boolean await(int timeout, TimeUnit unit) throws InterruptedException {
+ long millis = System.currentTimeMillis() + unit.toMillis(timeout);
+ while (spec == null) {
+ long now = System.currentTimeMillis();
+ if (now >= millis) {
+ return false;
+ }
+ Thread.sleep(1000);
+ }
+ return true;
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java b/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java
new file mode 100644
index 00000000000..12749ca9bc4
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/CustomTimer.java
@@ -0,0 +1,17 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.concurrent.Timer;
+
+/**
+ * @author <a href="mailto:thomasg@yahoo-inc.com">Thomas Gundersen</a>
+ */
+class CustomTimer implements Timer {
+
+ long millis = 0;
+
+ @Override
+ public long milliTime() {
+ return millis;
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java
new file mode 100755
index 00000000000..e67fc5030ed
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java
@@ -0,0 +1,92 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.RoutingTableSpec;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ErrorTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ Error err = new Error(69, "foo");
+ assertEquals(69, err.getCode());
+ assertEquals("foo", err.getMessage());
+
+ assertFalse(new Error(ErrorCode.TRANSIENT_ERROR, "foo").isFatal());
+ assertFalse(new Error(ErrorCode.TRANSIENT_ERROR + 1, "foo").isFatal());
+ assertTrue(new Error(ErrorCode.FATAL_ERROR, "foo").isFatal());
+ assertTrue(new Error(ErrorCode.FATAL_ERROR + 1, "foo").isFatal());
+ }
+
+ @Test
+ public void requireThatErrorIsPropagated() throws Exception {
+ RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
+ table.addHop("itr", "test/itr/session", Arrays.asList("test/itr/session"));
+ table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"));
+ table.addRoute("test", Arrays.asList("itr", "dst"));
+
+ Slobrok slobrok = new Slobrok();
+ TestServer src = new TestServer("test/src", table, slobrok, null, null);
+ TestServer itr = new TestServer("test/itr", table, slobrok, null, null);
+ TestServer dst = new TestServer("test/dst", table, slobrok, null, null);
+
+ Receptor ss_rr = new Receptor();
+ SourceSession ss = src.mb.createSourceSession(ss_rr);
+
+ Receptor is_mr = new Receptor();
+ Receptor is_rr = new Receptor();
+ IntermediateSession is = itr.mb.createIntermediateSession("session", true, is_mr, is_rr);
+
+ Receptor ds_mr = new Receptor();
+ DestinationSession ds = dst.mb.createDestinationSession("session", true, ds_mr);
+
+ src.waitSlobrok("test/itr/session", 1);
+ src.waitSlobrok("test/dst/session", 1);
+ itr.waitSlobrok("test/dst/session", 1);
+
+ for (int i = 0; i < 5; i++) {
+ assertTrue(ss.send(new SimpleMessage("msg"), "test").isAccepted());
+ Message msg = is_mr.getMessage(60);
+ assertNotNull(msg);
+ is.forward(msg);
+
+ assertNotNull(msg = ds_mr.getMessage(60));
+ Reply reply = new EmptyReply();
+ msg.swapState(reply);
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
+ ds.reply(reply);
+
+ assertNotNull(reply = is_rr.getReply(60));
+ assertEquals(reply.getNumErrors(), 1);
+ assertEquals(reply.getError(0).getService(), "test/dst/session");
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
+ is.forward(reply);
+
+ assertNotNull(reply = ss_rr.getReply(60));
+ assertEquals(reply.getNumErrors(), 2);
+ assertEquals(reply.getError(0).getService(), "test/dst/session");
+ assertEquals(reply.getError(1).getService(), "test/itr/session");
+ }
+
+ ss.destroy();
+ is.destroy();
+ ds.destroy();
+
+ dst.destroy();
+ itr.destroy();
+ src.destroy();
+ slobrok.stop();
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java
new file mode 100644
index 00000000000..95b4e3663e8
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/MessageBusTestCase.java
@@ -0,0 +1,144 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.RetryTransientErrorsPolicy;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingPolicy;
+import com.yahoo.messagebus.routing.test.CustomPolicyFactory;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class MessageBusTestCase {
+
+ @Test
+ public void requireThatBucketSequencingWithResenderEnabledCausesError() throws ListenFailedException {
+ Slobrok slobrok = new Slobrok();
+ TestServer server = new TestServer(new MessageBusParams()
+ .addProtocol(new SimpleProtocol())
+ .setRetryPolicy(new RetryTransientErrorsPolicy()),
+ new RPCNetworkParams()
+ .setSlobrokConfigId(slobrok.configId()));
+ Receptor receptor = new Receptor();
+ SourceSession session = server.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(receptor));
+ assertTrue(session.send(new SimpleMessage("foo") {
+ @Override
+ public boolean hasBucketSequence() {
+ return true;
+ }
+ }.setRoute(Route.parse("bar"))).isAccepted());
+ Reply reply = receptor.getReply(60);
+ assertNotNull(reply);
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.SEQUENCE_ERROR, reply.getError(0).getCode());
+ session.destroy();
+ server.destroy();
+ slobrok.stop();
+ }
+
+ @Test
+ public void testConnectionSpec() throws ListenFailedException, UnknownHostException {
+ // Setup servers and sessions.
+ Slobrok slobrok = new Slobrok();
+ List<TestServer> servers = new ArrayList<>();
+
+ TestServer srcServer = new TestServer("feeder", null, slobrok, null, null);
+ servers.add(srcServer);
+ SourceSession src = servers.get(0).mb.createSourceSession(new Receptor());
+
+ List<IntermediateSession> sessions = new ArrayList<>();
+ for (int i = 0; i < 10; ++i) {
+ TestServer server = new TestServer("intermediate/" + i, null, slobrok, null, null);
+ servers.add(server);
+ sessions.add(server.mb.createIntermediateSession("session", true, new Receptor(), new Receptor()));
+ }
+
+ TestServer dstServer = new TestServer("destination", null, slobrok, null, null);
+ DestinationSession dst = dstServer.mb.createDestinationSession("session", true, new Receptor());
+
+ assertTrue(srcServer.waitSlobrok("intermediate/*/session", sessions.size()));
+ assertTrue(srcServer.waitSlobrok("destination/session", 1));
+
+ StringBuilder route = new StringBuilder();
+ for (int i = 0; i < sessions.size(); i++) {
+ route.append("intermediate/").append(i).append("/session ");
+ route.append(sessions.get(i).getConnectionSpec()).append(" ");
+ }
+ route.append(dst.getConnectionSpec());
+
+ Message msg = new SimpleMessage("empty");
+ assertTrue(src.send(msg, Route.parse(route.toString())).isAccepted());
+ for (IntermediateSession itr : sessions) {
+ // Received using session name.
+ assertNotNull(msg = ((Receptor)itr.getMessageHandler()).getMessage(60));
+ itr.forward(msg);
+
+ // Received using connection spec.
+ assertNotNull(msg = ((Receptor)itr.getMessageHandler()).getMessage(60));
+ itr.forward(msg);
+ }
+ assertNotNull(msg = ((Receptor)dst.getMessageHandler()).getMessage(60));
+ dst.acknowledge(msg);
+ for (int i = sessions.size(); --i >= 0;) {
+ IntermediateSession itr = sessions.get(i);
+
+ // Received for connection spec.
+ Reply reply = ((Receptor)itr.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ itr.forward(reply);
+
+ // Received for session name.
+ assertNotNull(reply = ((Receptor)itr.getReplyHandler()).getReply(60));
+ itr.forward(reply);
+ }
+ assertNotNull(((Receptor)src.getReplyHandler()).getReply(60));
+
+ // Cleanup.
+ for (IntermediateSession session : sessions) {
+ session.destroy();
+ }
+ for (TestServer server : servers) {
+ server.destroy();
+ }
+ slobrok.stop();
+ }
+
+ @Test
+ public void testRoutingPolicyCache() throws ListenFailedException, UnknownHostException {
+ Slobrok slobrok = new Slobrok();
+ String config = "slobrok[1]\nslobrok[0].connectionspec \"" + new Spec("localhost", slobrok.port()).toString() + "\"\n";
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory());
+ MessageBus bus = new MessageBus(new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId("raw:" + config)),
+ new MessageBusParams().addProtocol(protocol));
+
+ RoutingPolicy all = bus.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null);
+ assertNotNull(all);
+
+ RoutingPolicy ref = bus.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null);
+ assertNotNull(ref);
+ assertSame(all, ref);
+
+ RoutingPolicy allArg = bus.getRoutingPolicy(SimpleProtocol.NAME, "Custom", "Arg");
+ assertNotNull(allArg);
+ assertNotSame(all, allArg);
+
+ RoutingPolicy refArg = bus.getRoutingPolicy(SimpleProtocol.NAME, "Custom", "Arg");
+ assertNotNull(refArg);
+ assertSame(allArg, refArg);
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/MessengerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/MessengerTestCase.java
new file mode 100644
index 00000000000..37a99381cd1
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/MessengerTestCase.java
@@ -0,0 +1,124 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class MessengerTestCase {
+
+ @Test
+ public void requireThatSyncWithSelfDoesNotCauseDeadLock() throws InterruptedException {
+ final Messenger msn = new Messenger();
+ msn.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ msn.enqueue(new Messenger.Task() {
+
+ @Override
+ public void run() {
+ msn.sync();
+ }
+
+ @Override
+ public void destroy() {
+ latch.countDown();
+ }
+ });
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void requireThatTaskIsExecuted() throws InterruptedException {
+ Messenger msn = new Messenger();
+ msn.start();
+ assertTrue(tryMessenger(msn));
+ }
+
+ @Test
+ public void requireThatRunExceptionIsCaught() throws InterruptedException {
+ Messenger msn = new Messenger();
+ msn.start();
+ msn.enqueue(new Messenger.Task() {
+ @Override
+ public void run() {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ });
+ assertTrue(tryMessenger(msn));
+ }
+
+ @Test
+ public void requireThatDestroyExceptionIsCaught() throws InterruptedException {
+ Messenger msn = new Messenger();
+ msn.start();
+ msn.enqueue(new Messenger.Task() {
+ @Override
+ public void run() {
+
+ }
+
+ @Override
+ public void destroy() {
+ throw new RuntimeException();
+ }
+ });
+ assertTrue(tryMessenger(msn));
+ }
+
+ @Test
+ public void requireThatRunAndDestroyExceptionsAreCaught() throws InterruptedException {
+ Messenger msn = new Messenger();
+ msn.start();
+ msn.enqueue(new Messenger.Task() {
+ @Override
+ public void run() {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void destroy() {
+ throw new RuntimeException();
+ }
+ });
+ assertTrue(tryMessenger(msn));
+ }
+
+ private static boolean tryMessenger(Messenger msn) {
+ MyTask task = new MyTask();
+ msn.enqueue(task);
+ try {
+ return task.runLatch.await(60, TimeUnit.SECONDS) &&
+ task.destroyLatch.await(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ private static class MyTask implements Messenger.Task {
+
+ final CountDownLatch runLatch = new CountDownLatch(1);
+ final CountDownLatch destroyLatch = new CountDownLatch(1);
+
+ @Override
+ public void run() {
+ runLatch.countDown();
+ }
+
+ @Override
+ public void destroy() {
+ destroyLatch.countDown();
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ProtocolRepositoryTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ProtocolRepositoryTestCase.java
new file mode 100644
index 00000000000..ddb21baadab
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/ProtocolRepositoryTestCase.java
@@ -0,0 +1,103 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.messagebus.routing.RoutingContext;
+import com.yahoo.messagebus.routing.RoutingPolicy;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ProtocolRepositoryTestCase {
+
+ @Test
+ public void requireThatPolicyCanBeNull() {
+ ProtocolRepository repo = new ProtocolRepository();
+ SimpleProtocol protocol = new SimpleProtocol();
+ repo.putProtocol(protocol);
+ assertNull(repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null));
+ }
+
+ @Test
+ public void requireThatPolicyCanBeCreated() {
+ ProtocolRepository repo = new ProtocolRepository();
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new MyFactory());
+ repo.putProtocol(protocol);
+ assertNotNull(repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null));
+ }
+
+ @Test
+ public void requireThatPolicyIsCached() {
+ ProtocolRepository repo = new ProtocolRepository();
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new MyFactory());
+ repo.putProtocol(protocol);
+
+ RoutingPolicy prev = repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null);
+ assertNotNull(prev);
+
+ RoutingPolicy next = repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null);
+ assertNotNull(next);
+ assertSame(prev, next);
+ }
+
+ @Test
+ public void requireThatPolicyParamIsPartOfCacheKey() {
+ ProtocolRepository repo = new ProtocolRepository();
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new MyFactory());
+ repo.putProtocol(protocol);
+
+ RoutingPolicy prev = repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", "foo");
+ assertNotNull(prev);
+
+ RoutingPolicy next = repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", "bar");
+ assertNotNull(next);
+ assertNotSame(prev, next);
+ }
+
+ @Test
+ public void requireThatCreatePolicyExceptionIsCaught() {
+ ProtocolRepository repo = new ProtocolRepository();
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ throw new RuntimeException();
+ }
+ });
+ repo.putProtocol(protocol);
+ assertNull(repo.getRoutingPolicy(SimpleProtocol.NAME, "Custom", null));
+ }
+
+ private static class MyFactory implements SimpleProtocol.PolicyFactory {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new MyPolicy();
+ }
+ }
+
+ private static class MyPolicy implements RoutingPolicy {
+
+ @Override
+ public void select(RoutingContext context) {
+
+ }
+
+ @Override
+ public void merge(RoutingContext context) {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java
new file mode 100644
index 00000000000..dc4beb5cf9f
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/RateThrottlingTestCase.java
@@ -0,0 +1,39 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.messagebus.test.SimpleMessage;
+import junit.framework.TestCase;
+
+public class RateThrottlingTestCase extends TestCase {
+
+ public void testPending() {
+ CustomTimer timer = new CustomTimer();
+ RateThrottlingPolicy policy = new RateThrottlingPolicy(5.0, timer);
+ policy.setMaxPendingCount(200);
+
+ // Check that we obey the max still.
+ assertFalse(policy.canSend(new SimpleMessage("test"), 300));
+ }
+
+ public int getActualRate(double desiredRate) {
+ CustomTimer timer = new CustomTimer();
+ RateThrottlingPolicy policy = new RateThrottlingPolicy(desiredRate, timer);
+
+ int ok = 0;
+ for (int i = 0; i < 10000; ++i) {
+ if (policy.canSend(new SimpleMessage("test"), 0)) {
+ ok++;
+ }
+ timer.millis += 10;
+ }
+
+ return ok;
+ }
+
+ public void testRates() {
+ assertEquals(10, getActualRate(0.1), 1);
+ assertEquals(1000, getActualRate(10), 100);
+ assertEquals(500, getActualRate(5), 50);
+ assertEquals(100, getActualRate(1), 10);
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java
new file mode 100755
index 00000000000..4e76ab86889
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/RoutableTestCase.java
@@ -0,0 +1,114 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleReply;
+
+import java.net.UnknownHostException;
+
+public class RoutableTestCase extends junit.framework.TestCase {
+
+ public void testMessageContext() throws ListenFailedException, UnknownHostException {
+ Slobrok slobrok = new Slobrok();
+ TestServer srcServer = new TestServer("src", null, slobrok, null, null);
+ TestServer dstServer = new TestServer("dst", null, slobrok, null, null);
+ SourceSession srcSession = srcServer.mb.createSourceSession(
+ new Receptor(),
+ new SourceSessionParams().setTimeout(600.0));
+ DestinationSession dstSession = dstServer.mb.createDestinationSession("session", true, new Receptor());
+
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+
+ Object context = new Object();
+ Message msg = new SimpleMessage("msg");
+ msg.setContext(context);
+ assertTrue(srcSession.send(msg, "dst/session", true).isAccepted());
+
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ dstSession.acknowledge(msg);
+
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertSame(reply.getContext(), context);
+
+ srcSession.destroy();
+ srcServer.destroy();
+ dstSession.destroy();
+ dstServer.destroy();
+ slobrok.stop();
+ }
+
+ public void testMessageSwapState() {
+ Message foo = new SimpleMessage("foo");
+ Route fooRoute = Route.parse("foo");
+ foo.setRoute(fooRoute);
+ foo.setRetry(1);
+ foo.setTimeReceivedNow();
+ foo.setTimeRemaining(2);
+
+ Message bar = new SimpleMessage("bar");
+ Route barRoute = Route.parse("bar");
+ bar.setRoute(barRoute);
+ bar.setRetry(3);
+ bar.setTimeReceivedNow();
+ bar.setTimeRemaining(4);
+
+ foo.swapState(bar);
+ assertEquals(barRoute, foo.getRoute());
+ assertEquals(fooRoute, bar.getRoute());
+ assertEquals(3, foo.getRetry());
+ assertEquals(1, bar.getRetry());
+ assertTrue(foo.getTimeReceived() >= bar.getTimeReceived());
+ assertEquals(4, foo.getTimeRemaining());
+ assertEquals(2, bar.getTimeRemaining());
+ }
+
+ public void testReplySwapState() {
+ Reply foo = new SimpleReply("foo");
+ Message fooMsg = new SimpleMessage("foo");
+ foo.setMessage(fooMsg);
+ foo.setRetryDelay(1);
+ foo.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatal"));
+ foo.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "transient"));
+
+ Reply bar = new SimpleReply("bar");
+ Message barMsg = new SimpleMessage("bar");
+ bar.setMessage(barMsg);
+ bar.setRetryDelay(2);
+ bar.addError(new Error(ErrorCode.ERROR_LIMIT, "err"));
+
+ foo.swapState(bar);
+ assertEquals(barMsg, foo.getMessage());
+ assertEquals(fooMsg, bar.getMessage());
+ assertEquals(2.0, foo.getRetryDelay());
+ assertEquals(1.0, bar.getRetryDelay());
+ assertEquals(1, foo.getNumErrors());
+ assertEquals(2, bar.getNumErrors());
+ }
+
+ public void testMessageDiscard() {
+ Receptor handler = new Receptor();
+ Message msg = new SimpleMessage("foo");
+ msg.pushHandler(handler);
+ msg.discard();
+
+ assertNull(handler.getReply(0));
+ }
+
+ public void testReplyDiscard() {
+ Receptor handler = new Receptor();
+ Message msg = new SimpleMessage("foo");
+ msg.pushHandler(handler);
+
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ reply.discard();
+
+ assertNull(handler.getReply(0));
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java
new file mode 100644
index 00000000000..6eb7257328b
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java
@@ -0,0 +1,169 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.component.Vtag;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import junit.framework.TestCase;
+
+import java.net.UnknownHostException;
+import java.util.logging.Handler;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SendProxyTestCase extends TestCase {
+
+ Slobrok slobrok;
+ TestServer srcServer, dstServer;
+ SourceSession srcSession;
+ DestinationSession dstSession;
+
+ @Override
+ public void setUp() throws UnknownHostException, ListenFailedException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ srcServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ public void testTraceByLogLevel() {
+ Logger log = Logger.getLogger(SendProxy.class.getName());
+ LogHandler logHandler = new LogHandler();
+ log.addHandler(logHandler);
+
+ log.setLevel(LogLevel.INFO);
+ sendMessage(0, null);
+ assertNull(logHandler.trace);
+
+ log.setLevel(LogLevel.DEBUG);
+ sendMessage(0, null);
+ assertNull(logHandler.trace);
+
+ sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertNull(logHandler.trace);
+
+ sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertEquals("Trace for reply with error(s):\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " [FATAL_ERROR @ localhost]: err\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 2) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+
+ log.setLevel(LogLevel.SPAM);
+ sendMessage(1, null);
+ assertNull(logHandler.trace);
+
+ sendMessage(0, null);
+ assertEquals("Trace for reply:\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 0) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+
+ sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertNull(logHandler.trace);
+
+ sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertEquals("Trace for reply with error(s):\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " [FATAL_ERROR @ localhost]: err\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 2) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+ }
+
+ private void sendMessage(int traceLevel, Error err) {
+ Message msg = new SimpleMessage("foo");
+ msg.getTrace().setLevel(traceLevel);
+ assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted());
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ if (err != null) {
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ reply.addError(err);
+ dstSession.reply(reply);
+ } else {
+ dstSession.acknowledge(msg);
+ }
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ }
+
+ private static class LogHandler extends Handler {
+
+ String trace = null;
+
+ @Override
+ public void publish(LogRecord record) {
+ String msg = record.getMessage();
+ if (msg.startsWith("Trace ")) {
+ msg = msg.replaceAll("\\[.*\\] ", "");
+ msg = msg.replaceAll("[0-9]+\\.[0-9]+ seconds", "x seconds");
+
+ String ver = Vtag.currentVersion.toString();
+ for (int i = msg.indexOf(ver); i >= 0; i = msg.indexOf(ver, i)) {
+ msg = msg.substring(0, i) + "${VERSION}" + msg.substring(i + ver.length());
+ }
+ trace = msg;
+ }
+ }
+
+ @Override
+ public void flush() {
+ // empty
+ }
+
+ @Override
+ public void close() throws SecurityException {
+ // empty
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
new file mode 100644
index 00000000000..bd053e5ad63
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SequencerTestCase.java
@@ -0,0 +1,179 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.messagebus.test.SimpleMessage;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SequencerTestCase extends junit.framework.TestCase {
+
+ public void testSyncNone() {
+ TestQueue src = new TestQueue();
+ TestQueue dst = new TestQueue();
+ QueueSender sender = new QueueSender(dst);
+ Sequencer seq = new Sequencer(sender);
+
+ seq.handleMessage(src.createMessage(false, 0));
+ seq.handleMessage(src.createMessage(false, 0));
+ seq.handleMessage(src.createMessage(false, 0));
+ seq.handleMessage(src.createMessage(false, 0));
+ seq.handleMessage(src.createMessage(false, 0));
+ assertEquals(0, src.size());
+ assertEquals(5, dst.size());
+
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ assertEquals(5, src.size());
+ assertEquals(0, dst.size());
+
+ src.checkReply(false, 0);
+ src.checkReply(false, 0);
+ src.checkReply(false, 0);
+ src.checkReply(false, 0);
+ src.checkReply(false, 0);
+ assertEquals(0, src.size());
+ assertEquals(0, dst.size());
+ }
+
+ public void testSyncId() {
+ TestQueue src = new TestQueue();
+ TestQueue dst = new TestQueue();
+ QueueSender sender = new QueueSender(dst);
+ Sequencer seq = new Sequencer(sender);
+
+ seq.handleMessage(src.createMessage(true, 1L));
+ seq.handleMessage(src.createMessage(true, 2L));
+ seq.handleMessage(src.createMessage(true, 3L));
+ seq.handleMessage(src.createMessage(true, 4L));
+ seq.handleMessage(src.createMessage(true, 5L));
+ assertEquals(0, src.size());
+ assertEquals(5, dst.size());
+
+ seq.handleMessage(src.createMessage(true, 1L));
+ seq.handleMessage(src.createMessage(true, 5L));
+ seq.handleMessage(src.createMessage(true, 2L));
+ seq.handleMessage(src.createMessage(true, 10L));
+ seq.handleMessage(src.createMessage(true, 4L));
+ seq.handleMessage(src.createMessage(true, 3L));
+ assertEquals(0, src.size());
+ assertEquals(6, dst.size());
+
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ assertEquals(5, src.size());
+ assertEquals(6, dst.size());
+
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ dst.replyNext();
+ assertEquals(11, src.size());
+ assertEquals(0, dst.size());
+
+ src.checkReply(true, 1);
+ src.checkReply(true, 2);
+ src.checkReply(true, 3);
+ src.checkReply(true, 4);
+ src.checkReply(true, 5);
+ src.checkReply(true, 10);
+ src.checkReply(true, 1);
+ src.checkReply(true, 2);
+ src.checkReply(true, 3);
+ src.checkReply(true, 4);
+ src.checkReply(true, 5);
+ assertEquals(0, src.size());
+ assertEquals(0, dst.size());
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestQueue extends LinkedList<Routable> implements ReplyHandler {
+
+ void checkReply(boolean hasSeqId, long seqId) {
+ if (size() == 0) {
+ throw new IllegalStateException("No routable in queue.");
+ }
+ Routable obj = remove();
+ assertTrue(obj instanceof Reply);
+
+ Reply reply = (Reply)obj;
+ Message msg = reply.getMessage();
+ assertNotNull(msg);
+
+ assertEquals(hasSeqId, msg.hasSequenceId());
+ if (hasSeqId) {
+ assertEquals(seqId, msg.getSequenceId());
+ }
+ }
+
+ public void handleReply(Reply reply) {
+ add(reply);
+ }
+
+ void replyNext() {
+ Routable obj = remove();
+ assertTrue(obj instanceof Message);
+ Message msg = (Message)obj;
+
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.setMessage(msg);
+ ReplyHandler handler = reply.popHandler();
+ handler.handleReply(reply);
+ }
+
+ Message createMessage(final boolean hasSeqId, final long seqId) {
+ Message ret = new MyMessage(hasSeqId, seqId);
+ ret.pushHandler(this);
+ return ret;
+ }
+ }
+
+ private static class QueueSender implements MessageHandler {
+
+ Queue<Routable> queue;
+
+ QueueSender(Queue<Routable> queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public void handleMessage(Message msg) {
+ queue.offer(msg);
+ }
+ }
+
+ private static class MyMessage extends SimpleMessage {
+
+ final boolean hasSeqId;
+ final long seqId;
+
+ MyMessage(boolean hasSeqId, long seqId) {
+ super("foo");
+ this.hasSeqId = hasSeqId;
+ this.seqId = seqId;
+ }
+
+ @Override
+ public boolean hasSequenceId() {
+ return hasSeqId;
+ }
+
+ @Override
+ public long getSequenceId() {
+ return seqId;
+ }
+ }
+}
+
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java
new file mode 100755
index 00000000000..2795758d922
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java
@@ -0,0 +1,53 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SimpleTripTestCase extends junit.framework.TestCase {
+
+ public void testSimpleTrip() throws ListenFailedException, UnknownHostException {
+ Slobrok slobrok = new Slobrok();
+ TestServer server = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams()
+ .setIdentity(new Identity("srv"))
+ .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ DestinationSession dst = server.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ SourceSession src = server.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(server.waitSlobrok("srv/session", 1));
+
+ assertTrue(src.send(new SimpleMessage("msg"), Route.parse("srv/session")).isAccepted());
+ Message msg = ((Receptor)dst.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ assertEquals(SimpleProtocol.NAME, msg.getProtocol());
+ assertEquals(SimpleProtocol.MESSAGE, msg.getType());
+ assertEquals("msg", ((SimpleMessage)msg).getValue());
+
+ Reply reply = new SimpleReply("reply");
+ reply.swapState(msg);
+ dst.reply(reply);
+
+ assertNotNull(reply = ((Receptor)src.getReplyHandler()).getReply(60));
+ assertEquals(SimpleProtocol.NAME, reply.getProtocol());
+ assertEquals(SimpleProtocol.REPLY, reply.getType());
+ assertEquals("reply", ((SimpleReply)reply).getValue());
+
+ src.destroy();
+ dst.destroy();
+ server.destroy();
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java
new file mode 100644
index 00000000000..405aa1c0b96
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/ThrottlerTestCase.java
@@ -0,0 +1,230 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.RoutingTableSpec;
+import com.yahoo.messagebus.test.*;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ThrottlerTestCase extends junit.framework.TestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer src, dst;
+
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
+ table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"));
+ table.addRoute("test", Arrays.asList("dst"));
+ slobrok = new Slobrok();
+ src = new TestServer("test/src", table, slobrok, null, null);
+ dst = new TestServer("test/dst", table, slobrok, null, null);
+ }
+
+ public void tearDown() {
+ dst.destroy();
+ src.destroy();
+ slobrok.stop();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public void testMaxCount() {
+ // Prepare a source session with throttle enabled.
+ SourceSessionParams params = new SourceSessionParams().setTimeout(600.0);
+ StaticThrottlePolicy policy = new StaticThrottlePolicy();
+ policy.setMaxPendingCount(10);
+ params.setThrottlePolicy(policy);
+
+ Receptor src_rr = new Receptor();
+ SourceSession src_s = src.mb.createSourceSession(src_rr, params);
+
+ // Prepare a destination session to acknowledge messages.
+ QueueAdapter dst_q = new QueueAdapter();
+ DestinationSession dst_s = dst.mb.createDestinationSession("session", true, dst_q);
+ src.waitSlobrok("test/dst/session", 1);
+
+ // Send until throttler rejects a message.
+ for (int i = 0; i < policy.getMaxPendingCount(); i++) {
+ assertTrue(src_s.send(new SimpleMessage("msg"), "test").isAccepted());
+ }
+ assertFalse(src_s.send(new SimpleMessage("msg"), "test").isAccepted());
+
+ // Acknowledge one message at a time, then attempt to send two more.
+ for (int i = 0; i < 10; i++) {
+ assertTrue(dst_q.waitSize(policy.getMaxPendingCount(), 60));
+ dst_s.acknowledge((Message)dst_q.dequeue());
+
+ assertNotNull(src_rr.getReply(60));
+ assertTrue(src_s.send(new SimpleMessage("msg"), "test").isAccepted());
+ assertFalse(src_s.send(new SimpleMessage("msg"), "test").isAccepted());
+ }
+
+ assertTrue(dst_q.waitSize(policy.getMaxPendingCount(), 60));
+ while (!dst_q.isEmpty()) {
+ dst_s.acknowledge((Message)dst_q.dequeue());
+ }
+
+ src_s.close();
+ dst_s.destroy();
+ }
+
+ public void testMaxSize() {
+ // Prepare a source session with throttle enabled.
+ SourceSessionParams params = new SourceSessionParams().setTimeout(600.0);
+ StaticThrottlePolicy policy = new StaticThrottlePolicy();
+ policy.setMaxPendingCount(1000);
+ policy.setMaxPendingSize(2);
+ params.setThrottlePolicy(policy);
+
+ Receptor src_rr = new Receptor();
+ SourceSession src_s = src.mb.createSourceSession(src_rr, params);
+
+ // Prepare a destination session to acknowledge messages.
+ QueueAdapter dst_q = new QueueAdapter();
+ DestinationSession dst_s = dst.mb.createDestinationSession("session", true, dst_q);
+ src.waitSlobrok("test/dst/session", 1);
+
+ assertTrue(src_s.send(new SimpleMessage("1"), "test").isAccepted());
+ assertTrue(dst_q.waitSize(1, 60));
+ assertTrue(src_s.send(new SimpleMessage("12"), "test").isAccepted());
+ assertTrue(dst_q.waitSize(2, 60));
+
+ assertFalse(src_s.send(new SimpleMessage("1"), "test").isAccepted());
+ dst_s.acknowledge((Message)dst_q.dequeue());
+ assertNotNull(src_rr.getReply(60));
+
+ assertFalse(src_s.send(new SimpleMessage("1"), "test").isAccepted());
+ dst_s.acknowledge((Message)dst_q.dequeue());
+ assertNotNull(src_rr.getReply(60));
+
+ assertTrue(src_s.send(new SimpleMessage("12"), "test").isAccepted());
+ assertTrue(dst_q.waitSize(1, 60));
+ assertFalse(src_s.send(new SimpleMessage("1"), "test").isAccepted());
+ dst_s.acknowledge((Message)dst_q.dequeue());
+ assertNotNull(src_rr.getReply(60));
+
+ // Close sessions.
+ src_s.close();
+ dst_s.destroy();
+ }
+
+ public void testDynamicWindowSize() {
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
+
+ policy.setWindowSizeIncrement(5);
+ policy.setResizeRate(1);
+
+ double windowSize = getWindowSize(policy, timer, 100);
+ assertTrue(windowSize >= 90 && windowSize <= 110);
+
+ windowSize = getWindowSize(policy, timer, 200);
+ assertTrue(windowSize >= 90 && windowSize <= 210);
+
+ windowSize = getWindowSize(policy, timer, 50);
+ assertTrue(windowSize >= 9 && windowSize <= 55);
+
+ windowSize = getWindowSize(policy, timer, 500);
+ assertTrue(windowSize >= 90 && windowSize <= 505);
+
+ windowSize = getWindowSize(policy, timer, 100);
+ assertTrue(windowSize >= 90 && windowSize <= 115);
+ }
+
+ public void testIdleTimePeriod() {
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
+
+ policy.setWindowSizeIncrement(5);
+ policy.setResizeRate(1);
+
+ double windowSize = getWindowSize(policy, timer, 100);
+ assertTrue(windowSize >= 90 && windowSize <= 110);
+
+ Message msg = new SimpleMessage("foo");
+ timer.millis += 30 * 1000;
+ assertTrue(policy.canSend(msg, 0));
+ assertTrue(windowSize >= 90 && windowSize <= 110);
+
+ timer.millis += 60 * 1000 + 1;
+ assertTrue(policy.canSend(msg, 50));
+ assertEquals(55, policy.getMaxPendingCount());
+
+ timer.millis += 60 * 1000 + 1;
+ assertTrue(policy.canSend(msg, 0));
+ assertEquals(5, policy.getMaxPendingCount());
+
+ }
+
+ public void testMinWindowSize() {
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
+
+ policy.setWindowSizeIncrement(5);
+ policy.setResizeRate(1);
+ policy.setMinWindowSize(150);
+
+ double windowSize = getWindowSize(policy, timer, 200);
+ assertTrue(windowSize >= 150 && windowSize <= 210);
+ }
+
+ public void testMaxWindowSize() {
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
+
+ policy.setWindowSizeIncrement(5);
+ policy.setResizeRate(1);
+ policy.setMaxWindowSize(50);
+
+ double windowSize = getWindowSize(policy, timer, 100);
+ assertTrue(windowSize >= 40 && windowSize <= 50);
+ }
+
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private int getWindowSize(DynamicThrottlePolicy policy, CustomTimer timer, int maxPending) {
+ Message msg = new SimpleMessage("foo");
+ Reply reply = new SimpleReply("bar");
+ reply.setContext(1);
+ for (int i = 0; i < 999; ++i) {
+ int numPending = 0;
+ while (policy.canSend(msg, numPending)) {
+ policy.processMessage(msg);
+ ++numPending;
+ }
+
+ long tripTime = (numPending < maxPending) ? 1000 : 1000 + (numPending - maxPending) * 1000;
+ timer.millis += tripTime;
+
+ while (--numPending >= 0) {
+ policy.processReply(reply);
+ }
+ }
+ int ret = policy.getMaxPendingCount();
+ System.out.println("getWindowSize() = " + ret);
+ return ret;
+ }
+
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/TimeoutTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/TimeoutTestCase.java
new file mode 100755
index 00000000000..05a14bf8d16
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/TimeoutTestCase.java
@@ -0,0 +1,102 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class TimeoutTestCase {
+
+ private final Slobrok slobrok;
+ private final TestServer srcServer, dstServer;
+ private final SourceSession srcSession;
+ private final DestinationSession dstSession;
+
+ public TimeoutTestCase() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst"))
+ .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams()
+ .setName("session")
+ .setMessageHandler(new Receptor()));
+ srcServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ }
+
+ @Before
+ public void waitForSlobrokRegistration() {
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+ }
+
+ @After
+ public void destroyResources() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(0);
+ if (msg != null) {
+ msg.discard();
+ }
+ }
+
+ @Test
+ public void requireThatMessageCanTimeout() throws ListenFailedException, UnknownHostException {
+ srcSession.setTimeout(1);
+ assertSend(srcSession, newMessage(), "dst/session");
+ assertTimeout(((Receptor)srcSession.getReplyHandler()).getReply(60));
+ }
+
+ @Test
+ public void requireThatZeroTimeoutMeansImmediateTimeout() throws ListenFailedException, UnknownHostException {
+ srcSession.setTimeout(0);
+ assertSend(srcSession, newMessage(), "dst/session");
+ assertTimeout(((Receptor)srcSession.getReplyHandler()).getReply(60));
+ }
+
+ private static void assertSend(SourceSession session, Message msg, String route) {
+ assertTrue(session.send(msg, Route.parse(route)).isAccepted());
+ }
+
+ private static void assertTimeout(Reply reply) {
+ assertNotNull(reply);
+ assertTrue(reply.getTrace().toString(), hasError(reply, ErrorCode.TIMEOUT));
+ }
+
+ private static Message newMessage() {
+ Message msg = new SimpleMessage("msg");
+ msg.getTrace().setLevel(9);
+ return msg;
+ }
+
+ private static boolean hasError(Reply reply, int errorCode) {
+ for (int i = 0; i < reply.getNumErrors(); ++i) {
+ if (reply.getError(i).getCode() == errorCode) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/TraceTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/TraceTestCase.java
new file mode 100755
index 00000000000..c6b67087b7f
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/TraceTestCase.java
@@ -0,0 +1,286 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class TraceTestCase extends junit.framework.TestCase {
+
+ public void testEncodeDecode() {
+ assertEquals("()", TraceNode.decode("").encode());
+ assertEquals("()", TraceNode.decode("[xyz").encode());
+ assertEquals("([xyz][])", TraceNode.decode("[xyz][]").encode());
+ assertEquals("[xyz]", TraceNode.decode("[xyz]").encode());
+ assertEquals("()", TraceNode.decode("{()").encode());
+ assertEquals("({()}{})", TraceNode.decode("{()}{}").encode());
+ assertEquals("{()}", TraceNode.decode("{()}").encode());
+ assertEquals("()", TraceNode.decode("({}").encode());
+ assertEquals("(({})())", TraceNode.decode("({})()").encode());
+ assertEquals("([])", TraceNode.decode("([])").encode());
+
+ assertTrue(TraceNode.decode("").isEmpty());
+ assertTrue(!TraceNode.decode("([note])").isEmpty());
+
+ String str =
+ "([[17/Jun/2009:09:02:30 +0200\\] Message (type 1) received at 'dst' for session 'session'.]" +
+ "[[17/Jun/2009:09:02:30 +0200\\] [APP_TRANSIENT_ERROR @ localhost\\]: err1]" +
+ "[[17/Jun/2009:09:02:30 +0200\\] Sending reply (version 4.2) from 'dst'.])";
+ System.out.println(TraceNode.decode(str).toString());
+ assertEquals(str, TraceNode.decode(str).encode());
+
+ str = "([Note 0][Note 1]{[Note 2]}{([Note 3])({[Note 4]})})";
+ TraceNode t = TraceNode.decode(str);
+ assertEquals(str, t.encode());
+
+ assertTrue(t.isRoot());
+ assertTrue(t.isStrict());
+ assertTrue(!t.isLeaf());
+ assertEquals(4, t.getNumChildren());
+
+ {
+ TraceNode c = t.getChild(0);
+ assertTrue(c.isLeaf());
+ assertEquals("Note 0", c.getNote());
+ }
+ {
+ TraceNode c = t.getChild(1);
+ assertTrue(c.isLeaf());
+ assertEquals("Note 1", c.getNote());
+ }
+ {
+ TraceNode c = t.getChild(2);
+ assertTrue(!c.isLeaf());
+ assertTrue(!c.isStrict());
+ assertEquals(1, c.getNumChildren());
+ {
+ TraceNode d = c.getChild(0);
+ assertTrue(d.isLeaf());
+ assertEquals("Note 2", d.getNote());
+ }
+ }
+ {
+ TraceNode c = t.getChild(3);
+ assertTrue(!c.isStrict());
+ assertEquals(2, c.getNumChildren());
+ {
+ TraceNode d = c.getChild(0);
+ assertTrue(d.isStrict());
+ assertTrue(!d.isLeaf());
+ assertEquals(1, d.getNumChildren());
+ {
+ TraceNode e = d.getChild(0);
+ assertTrue(e.isLeaf());
+ assertEquals("Note 3", e.getNote());
+ }
+ }
+ {
+ TraceNode d = c.getChild(1);
+ assertTrue(d.isStrict());
+ assertEquals(1, d.getNumChildren());
+ {
+ TraceNode e = d.getChild(0);
+ assertTrue(!e.isStrict());
+ assertEquals(1, e.getNumChildren());
+ {
+ TraceNode f = e.getChild(0);
+ assertTrue(f.isLeaf());
+ assertEquals("Note 4", f.getNote());
+ }
+ }
+ }
+ }
+ }
+
+ public void testReservedChars() {
+ TraceNode t = new TraceNode();
+ t.addChild("abc(){}[]\\xyz");
+ assertEquals("abc(){}[]\\xyz", t.getChild(0).getNote());
+ assertEquals("([abc(){}[\\]\\\\xyz])", t.encode());
+ {
+ // test swap/clear/empty here
+ TraceNode t2 = new TraceNode();
+ assertTrue(t2.isEmpty());
+ t2.swap(t);
+ assertTrue(!t2.isEmpty());
+ assertEquals("abc(){}[]\\xyz", t2.getChild(0).getNote());
+ assertEquals("([abc(){}[\\]\\\\xyz])", t2.encode());
+ t2.clear();
+ assertTrue(t2.isEmpty());
+ }
+ }
+
+ public void testAdd() {
+ TraceNode t1 = TraceNode.decode("([x])");
+ TraceNode t2 = TraceNode.decode("([y])");
+ TraceNode t3 = TraceNode.decode("([z])");
+
+ t1.addChild(t2);
+ assertEquals("([x]([y]))", t1.encode());
+ assertTrue(t1.getChild(1).isStrict());
+ t1.addChild("txt");
+ assertTrue(t1.getChild(2).isLeaf());
+ assertEquals("([x]([y])[txt])", t1.encode());
+ t3.addChild(t1);
+ assertEquals("([z]([x]([y])[txt]))", t3.encode());
+
+ // crazy but possible (everything is by value)
+ t2.addChild(t2).addChild(t2);
+ assertEquals("([y]([y])([y]([y])))", t2.encode());
+ }
+
+ public void testStrict() {
+ assertEquals("{}", TraceNode.decode("()").setStrict(false).encode());
+ assertEquals("{[x]}", TraceNode.decode("([x])").setStrict(false).encode());
+ assertEquals("{[x][y]}", TraceNode.decode("([x][y])").setStrict(false).encode());
+ }
+
+ public void testTraceLevel() {
+ Trace t = new Trace();
+ t.setLevel(4);
+ assertEquals(4, t.getLevel());
+ t.trace(9, "no");
+ assertEquals(0, t.getRoot().getNumChildren());
+ t.trace(8, "no");
+ assertEquals(0, t.getRoot().getNumChildren());
+ t.trace(7, "no");
+ assertEquals(0, t.getRoot().getNumChildren());
+ t.trace(6, "no");
+ assertEquals(0, t.getRoot().getNumChildren());
+ t.trace(5, "no");
+ assertEquals(0, t.getRoot().getNumChildren());
+ t.trace(4, "yes");
+ assertEquals(1, t.getRoot().getNumChildren());
+ t.trace(3, "yes");
+ assertEquals(2, t.getRoot().getNumChildren());
+ t.trace(2, "yes");
+ assertEquals(3, t.getRoot().getNumChildren());
+ t.trace(1, "yes");
+ assertEquals(4, t.getRoot().getNumChildren());
+ t.trace(0, "yes");
+ assertEquals(5, t.getRoot().getNumChildren());
+ }
+
+ public void testCompact() {
+ assertEquals("()", TraceNode.decode("()").compact().encode());
+ assertEquals("()", TraceNode.decode("(())").compact().encode());
+ assertEquals("()", TraceNode.decode("(()())").compact().encode());
+ assertEquals("()", TraceNode.decode("({})").compact().encode());
+ assertEquals("()", TraceNode.decode("({}{})").compact().encode());
+ assertEquals("()", TraceNode.decode("({{}{}})").compact().encode());
+
+ assertEquals("([x])", TraceNode.decode("([x])").compact().encode());
+ assertEquals("([x])", TraceNode.decode("(([x]))").compact().encode());
+ assertEquals("([x][y])", TraceNode.decode("(([x])([y]))").compact().encode());
+ assertEquals("([x])", TraceNode.decode("({[x]})").compact().encode());
+ assertEquals("([x][y])", TraceNode.decode("({[x]}{[y]})").compact().encode());
+ assertEquals("({[x][y]})", TraceNode.decode("({{[x]}{[y]}})").compact().encode());
+
+ assertEquals("([a][b][c][d])", TraceNode.decode("(([a][b])([c][d]))").compact().encode());
+ assertEquals("({[a][b]}{[c][d]})", TraceNode.decode("({[a][b]}{[c][d]})").compact().encode());
+ assertEquals("({[a][b][c][d]})", TraceNode.decode("({{[a][b]}{[c][d]}})").compact().encode());
+ assertEquals("({([a][b])([c][d])})", TraceNode.decode("({([a][b])([c][d])})").compact().encode());
+
+ assertEquals("({{}{(({()}({}){()(){}}){})}})", TraceNode.decode("({{}{(({()}({}){()(){}}){})}})").encode());
+ assertEquals("()", TraceNode.decode("({{}{(({()}({}){()(){}}){})}})").compact().encode());
+ assertEquals("([x])", TraceNode.decode("({{}{([x]({()}({}){()(){}}){})}})").compact().encode());
+ assertEquals("([x])", TraceNode.decode("({{}{(({()}({[x]}){()(){}}){})}})").compact().encode());
+ assertEquals("([x])", TraceNode.decode("({{}{(({()}({}){()(){}})[x]{})}})").compact().encode());
+
+ assertEquals("({[a][b][c][d][e][f]})", TraceNode.decode("({({[a][b]})({[c][d]})({[e][f]})})").compact().encode());
+ }
+
+ public void testSort() {
+ assertEquals("([b][a][c])", TraceNode.decode("([b][a][c])").sort().encode());
+ assertEquals("({[a][b][c]})", TraceNode.decode("({[b][a][c]})").sort().encode());
+ assertEquals("(([c][a])([b]))", TraceNode.decode("(([c][a])([b]))").sort().encode());
+ assertEquals("({[b]([c][a])})", TraceNode.decode("({([c][a])[b]})").sort().encode());
+ assertEquals("({[a][c]}[b])", TraceNode.decode("({[c][a]}[b])").sort().encode());
+ assertEquals("({([b]){[a][c]}})", TraceNode.decode("({{[c][a]}([b])})").sort().encode());
+ }
+
+ public void testNormalize() {
+ TraceNode t1 = TraceNode.decode("({([a][b]{[x][y]([p][q])})([c][d])([e][f])})");
+ TraceNode t2 = TraceNode.decode("({([a][b]{[y][x]([p][q])})([c][d])([e][f])})");
+ TraceNode t3 = TraceNode.decode("({([a][b]{[y]([p][q])[x]})([c][d])([e][f])})");
+ TraceNode t4 = TraceNode.decode("({([e][f])([a][b]{[y]([p][q])[x]})([c][d])})");
+ TraceNode t5 = TraceNode.decode("({([e][f])([c][d])([a][b]{([p][q])[y][x]})})");
+
+ TraceNode tx = TraceNode.decode("({([b][a]{[x][y]([p][q])})([c][d])([e][f])})");
+ TraceNode ty = TraceNode.decode("({([a][b]{[x][y]([p][q])})([d][c])([e][f])})");
+ TraceNode tz = TraceNode.decode("({([a][b]{[x][y]([q][p])})([c][d])([e][f])})");
+
+ assertEquals("({([a][b]{[x][y]([p][q])})([c][d])([e][f])})", t1.compact().encode());
+
+ assertTrue(!t1.compact().encode().equals(t2.compact().encode()));
+ assertTrue(!t1.compact().encode().equals(t3.compact().encode()));
+ assertTrue(!t1.compact().encode().equals(t4.compact().encode()));
+ assertTrue(!t1.compact().encode().equals(t5.compact().encode()));
+ assertTrue(!t1.compact().encode().equals(tx.compact().encode()));
+ assertTrue(!t1.compact().encode().equals(ty.compact().encode()));
+ assertTrue(!t1.compact().encode().equals(tz.compact().encode()));
+
+ System.out.println("1: " + t1.normalize().encode());
+ System.out.println("2: " + t2.normalize().encode());
+ System.out.println("3: " + t3.normalize().encode());
+ System.out.println("4: " + t4.normalize().encode());
+ System.out.println("5: " + t5.normalize().encode());
+ System.out.println("x: " + tx.normalize().encode());
+ System.out.println("y: " + ty.normalize().encode());
+ System.out.println("z: " + tz.normalize().encode());
+ assertTrue(t1.normalize().encode().equals(t2.normalize().encode()));
+ assertTrue(t1.normalize().encode().equals(t3.normalize().encode()));
+ assertTrue(t1.normalize().encode().equals(t4.normalize().encode()));
+ assertTrue(t1.normalize().encode().equals(t5.normalize().encode()));
+ assertTrue(!t1.normalize().encode().equals(tx.normalize().encode()));
+ assertTrue(!t1.normalize().encode().equals(ty.normalize().encode()));
+ assertTrue(!t1.normalize().encode().equals(tz.normalize().encode()));
+
+ assertEquals("({([c][d])([e][f])([a][b]{[x][y]([p][q])})})", t1.normalize().encode());
+ }
+
+ public void testTraceDump() {
+ {
+ Trace big = new Trace();
+ TraceNode b1 = new TraceNode();
+ TraceNode b2 = new TraceNode();
+ for (int i = 0; i < 100; ++i) {
+ b2.addChild("test");
+ }
+ for (int i = 0; i < 10; ++i) {
+ b1.addChild(b2);
+ }
+ for (int i = 0; i < 10; ++i) {
+ big.getRoot().addChild(b1);
+ }
+ String normal = big.toString();
+ String full = big.getRoot().toString();
+ assertTrue(normal.length() > 30000);
+ assertTrue(normal.length() < 32000);
+ assertTrue(full.length() > 50000);
+ assertEquals(normal.substring(0, 30000), full.substring(0, 30000));
+ }
+ {
+ TraceNode s1 = new TraceNode();
+ TraceNode s2 = new TraceNode();
+ s2.addChild("test");
+ s2.addChild("test");
+ s1.addChild(s2);
+ s1.addChild(s2);
+ assertEquals("...\n", s1.toString(0));
+ assertEquals("<trace>\n...\n", s1.toString(1));
+ assertEquals("<trace>\n" + // 8 8
+ " <trace>\n" + // 12 20
+ " test\n" + // 13 33
+ "...\n", s1.toString(33));
+ assertEquals("<trace>\n" + // 8 8
+ " test\n" + // 9 17
+ " test\n" + // 9 26
+ "...\n", s2.toString(26));
+ assertEquals("<trace>\n" + // 8 8
+ " test\n" + // 9 17
+ " test\n" + // 9 26
+ "</trace>\n", s2.toString(27));
+ assertEquals(s2.toString(27), s2.toString());
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java
new file mode 100755
index 00000000000..e675a8ef98a
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/TraceTripTestCase.java
@@ -0,0 +1,116 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.RoutingTableSpec;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class TraceTripTestCase extends junit.framework.TestCase {
+
+ Slobrok slobrok;
+ TestServer src;
+ TestServer pxy;
+ TestServer dst;
+
+ public TraceTripTestCase(String message) {
+ super(message);
+ }
+
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME)
+ .addHop("pxy", "test/pxy/session", Arrays.asList("test/pxy/session"))
+ .addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"))
+ .addRoute("test", Arrays.asList("pxy", "dst"));
+
+ slobrok = new Slobrok();
+ src = new TestServer("test/src", table, slobrok, null, null);
+ pxy = new TestServer("test/pxy", table, slobrok, null, null);
+ dst = new TestServer("test/dst", table, slobrok, null, null);
+ }
+
+ public void tearDown() {
+ dst.destroy();
+ pxy.destroy();
+ src.destroy();
+ slobrok.stop();
+ }
+
+ public void testTrip() {
+ Receptor src_rr = new Receptor();
+ SourceSession src_s = src.mb.createSourceSession(src_rr);
+
+ new Proxy(pxy.mb);
+ assertTrue(src.waitSlobrok("test/pxy/session", 1));
+
+ new Server(dst.mb);
+ assertTrue(src.waitSlobrok("test/dst/session", 1));
+ assertTrue(pxy.waitSlobrok("test/dst/session", 1));
+
+ Message msg = new SimpleMessage("");
+ msg.getTrace().setLevel(1);
+ msg.getTrace().trace(1, "Client message", false);
+ src_s.send(msg, "test");
+ Reply reply = src_rr.getReply(60);
+ reply.getTrace().trace(1, "Client reply", false);
+ assertTrue(reply.getNumErrors() == 0);
+
+ TraceNode t = new TraceNode()
+ .addChild("Client message")
+ .addChild("Proxy message")
+ .addChild("Server message")
+ .addChild("Server reply")
+ .addChild("Proxy reply")
+ .addChild("Client reply");
+ System.out.println("reply: " + reply.getTrace().getRoot().encode());
+ System.out.println("want : " + t.encode());
+ assertTrue(reply.getTrace().getRoot().encode().equals(t.encode()));
+ }
+
+ private static class Proxy implements MessageHandler, ReplyHandler {
+ private IntermediateSession session;
+
+ public Proxy(MessageBus bus) {
+ session = bus.createIntermediateSession("session", true, this, this);
+ }
+
+ public void handleMessage(Message msg) {
+ msg.getTrace().trace(1, "Proxy message", false);
+ System.out.println(msg.getTrace().getRoot().encode());
+ session.forward(msg);
+ }
+
+ public void handleReply(Reply reply) {
+ reply.getTrace().trace(1, "Proxy reply", false);
+ System.out.println(reply.getTrace().getRoot().encode());
+ session.forward(reply);
+ }
+ }
+
+ private static class Server implements MessageHandler {
+ private DestinationSession session;
+
+ public Server(MessageBus bus) {
+ session = bus.createDestinationSession("session", true, this);
+ }
+
+ public void handleMessage(Message msg) {
+ msg.getTrace().trace(1, "Server message", false);
+ System.out.println(msg.getTrace().getRoot().encode());
+ Reply reply = new EmptyReply();
+ msg.swapState(reply);
+ reply.getTrace().trace(1, "Server reply", false);
+ System.out.println(reply.getTrace().getRoot().encode());
+ session.reply(reply);
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/IdentityTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/IdentityTestCase.java
new file mode 100644
index 00000000000..2191b2750bf
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/IdentityTestCase.java
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class IdentityTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ Identity id = new Identity("foo");
+ assertNotNull(id.getHostname());
+ assertEquals("foo", id.getServicePrefix());
+ }
+
+ @Test
+ public void requireThatCopyConstructorWorks() {
+ Identity lhs = new Identity("foo");
+ Identity rhs = new Identity(lhs);
+ assertEquals(lhs.getHostname(), rhs.getHostname());
+ assertEquals(lhs.getServicePrefix(), rhs.getServicePrefix());
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
new file mode 100644
index 00000000000..296e724a502
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/local/LocalNetworkTest.java
@@ -0,0 +1,132 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.local;
+
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.IntermediateSession;
+import com.yahoo.messagebus.IntermediateSessionParams;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.Test;
+
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class LocalNetworkTest {
+
+ @Test
+ public void requireThatLocalNetworkCanSendAndReceive() throws InterruptedException {
+ final LocalWire wire = new LocalWire();
+
+ final Server serverA = new Server(wire);
+ final SourceSession source = serverA.newSourceSession();
+
+ final Server serverB = new Server(wire);
+ final IntermediateSession intermediate = serverB.newIntermediateSession();
+
+ final Server serverC = new Server(wire);
+ final DestinationSession destination = serverC.newDestinationSession();
+
+ Message msg = new SimpleMessage("foo");
+ msg.setRoute(new Route().addHop(Hop.parse(intermediate.getConnectionSpec()))
+ .addHop(Hop.parse(destination.getConnectionSpec())));
+ assertThat(source.send(msg).isAccepted(), is(true));
+
+ msg = serverB.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ intermediate.forward(msg);
+
+ msg = serverC.messages.poll(60, TimeUnit.SECONDS);
+ assertThat(msg, instanceOf(SimpleMessage.class));
+ assertThat(((SimpleMessage)msg).getValue(), is("foo"));
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ destination.reply(reply);
+
+ reply = serverB.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+ intermediate.forward(reply);
+
+ reply = serverA.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(SimpleReply.class));
+ assertThat(((SimpleReply)reply).getValue(), is("bar"));
+
+ serverA.mbus.destroy();
+ serverB.mbus.destroy();
+ serverC.mbus.destroy();
+ }
+
+ @Test
+ public void requireThatUnknownServiceRepliesWithNoAddressForService() throws InterruptedException {
+ final Server server = new Server(new LocalWire());
+ final SourceSession source = server.newSourceSession();
+
+ final Message msg = new SimpleMessage("foo").setRoute(Route.parse("bar"));
+ assertThat(source.send(msg).isAccepted(), is(true));
+ final Reply reply = server.replies.poll(60, TimeUnit.SECONDS);
+ assertThat(reply, instanceOf(EmptyReply.class));
+
+ server.mbus.destroy();
+ }
+
+ private static class Server implements MessageHandler, ReplyHandler {
+
+ final MessageBus mbus;
+ final BlockingDeque<Message> messages = new LinkedBlockingDeque<>();
+ final BlockingDeque<Reply> replies = new LinkedBlockingDeque<>();
+
+ Server(final LocalWire wire) {
+ mbus = new MessageBus(new LocalNetwork(wire),
+ new MessageBusParams().addProtocol(new SimpleProtocol())
+ .setRetryPolicy(null));
+ }
+
+ SourceSession newSourceSession() {
+ return mbus.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(this));
+ }
+
+ IntermediateSession newIntermediateSession() {
+ return mbus.createIntermediateSession(new IntermediateSessionParams()
+ .setMessageHandler(this)
+ .setReplyHandler(this));
+ }
+
+ DestinationSession newDestinationSession() {
+ return mbus.createDestinationSession(new DestinationSessionParams()
+ .setMessageHandler(this));
+ }
+
+ @Override
+ public void handleMessage(final Message msg) {
+ messages.addLast(msg);
+ }
+
+ @Override
+ public void handleReply(final Reply reply) {
+ replies.addLast(reply);
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java
new file mode 100644
index 00000000000..7fe481a2196
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/BasicNetworkTestCase.java
@@ -0,0 +1,152 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingTableSpec;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+
+
+/**
+ * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ */
+public class BasicNetworkTestCase extends junit.framework.TestCase {
+
+ Slobrok slobrok;
+ TestServer src;
+ TestServer pxy;
+ TestServer dst;
+
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
+ table.addHop("pxy", "test/pxy/session", Arrays.asList("test/pxy/session"));
+ table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"));
+ table.addRoute("test", Arrays.asList("pxy", "dst"));
+ slobrok = new Slobrok();
+ src = new TestServer("test/src", table, slobrok, null, null);
+ pxy = new TestServer("test/pxy", table, slobrok, null, null);
+ dst = new TestServer("test/dst", table, slobrok, null, null);
+ }
+
+ public void tearDown() {
+ dst.destroy();
+ pxy.destroy();
+ src.destroy();
+ slobrok.stop();
+ }
+
+ public void testNetwork() {
+ // set up receptors
+ Receptor src_rr = new Receptor();
+ Receptor pxy_mr = new Receptor();
+ Receptor pxy_rr = new Receptor();
+ Receptor dst_mr = new Receptor();
+
+ // set up sessions
+ SourceSessionParams sp = new SourceSessionParams();
+ sp.setTimeout(30.0);
+
+ SourceSession ss = src.mb.createSourceSession(src_rr, sp);
+ IntermediateSession is = pxy.mb.createIntermediateSession("session", true, pxy_mr, pxy_rr);
+ DestinationSession ds = dst.mb.createDestinationSession("session", true, dst_mr);
+
+ // wait for slobrok registration
+ assertTrue(src.waitSlobrok("test/pxy/session", 1));
+ assertTrue(src.waitSlobrok("test/dst/session", 1));
+ assertTrue(pxy.waitSlobrok("test/dst/session", 1));
+
+ // send message on client
+ ss.send(new SimpleMessage("test message"), "test");
+
+ // check message on proxy
+ Message msg = pxy_mr.getMessage(60);
+ assertTrue(msg != null);
+ assertEquals(SimpleProtocol.MESSAGE, msg.getType());
+ SimpleMessage sm = (SimpleMessage) msg;
+ assertEquals("test message", sm.getValue());
+
+ // forward message on proxy
+ sm.setValue(sm.getValue() + " pxy");
+ is.forward(sm);
+
+ // check message on server
+ msg = dst_mr.getMessage(60);
+ assertTrue(msg != null);
+ assertEquals(SimpleProtocol.MESSAGE, msg.getType());
+ sm = (SimpleMessage) msg;
+ assertEquals("test message pxy", sm.getValue());
+
+ // send reply on server
+ SimpleReply sr = new SimpleReply("test reply");
+ sm.swapState(sr);
+ ds.reply(sr);
+
+ // check reply on proxy
+ Reply reply = pxy_rr.getReply(60);
+ assertTrue(reply != null);
+ assertEquals(SimpleProtocol.REPLY, reply.getType());
+ sr = (SimpleReply) reply;
+ assertEquals("test reply", sr.getValue());
+
+ // forward reply on proxy
+ sr.setValue(sr.getValue() + " pxy");
+ is.forward(sr);
+
+ // check reply on client
+ reply = src_rr.getReply(60);
+ assertTrue(reply != null);
+ assertEquals(SimpleProtocol.REPLY, reply.getType());
+ sr = (SimpleReply) reply;
+ assertEquals("test reply pxy", sr.getValue());
+
+ ss.destroy();
+ is.destroy();
+ ds.destroy();
+ }
+
+ public void testTimeoutsFollowMessage() {
+ SourceSessionParams params = new SourceSessionParams().setTimeout(600.0);
+ SourceSession ss = src.mb.createSourceSession(new Receptor(), params);
+ DestinationSession ds = dst.mb.createDestinationSession("session", true, new Receptor());
+ assertTrue(src.waitSlobrok("test/dst/session", 1));
+
+ // Test default timeouts being set.
+ Message msg = new SimpleMessage("msg");
+ msg.getTrace().setLevel(9);
+ long now = SystemTimer.INSTANCE.milliTime();
+ assertTrue(ss.send(msg, Route.parse("dst")).isAccepted());
+
+ assertNotNull(msg = ((Receptor)ds.getMessageHandler()).getMessage(60));
+ assertTrue(msg.getTimeReceived() >= now);
+ assertTrue(params.getTimeout() * 1000 >= msg.getTimeRemaining());
+ ds.acknowledge(msg);
+
+ assertNotNull(((Receptor)ss.getReplyHandler()).getReply(60));
+
+ // Test default timeouts being overwritten.
+ msg = new SimpleMessage("msg");
+ msg.getTrace().setLevel(9);
+ msg.setTimeRemaining(2 * (long)(params.getTimeout() * 1000));
+ assertTrue(ss.send(msg, Route.parse("dst")).isAccepted());
+
+ assertNotNull(msg = ((Receptor)ds.getMessageHandler()).getMessage(60));
+ assertTrue(params.getTimeout() * 1000 < msg.getTimeRemaining());
+ ds.acknowledge(msg);
+
+ assertNotNull(((Receptor)ss.getReplyHandler()).getReply(60));
+
+ ss.destroy();
+ ds.destroy();
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java
new file mode 100644
index 00000000000..3f6c449679e
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/LoadBalanceTestCase.java
@@ -0,0 +1,96 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.QueueAdapter;
+import com.yahoo.messagebus.test.SimpleMessage;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ */
+public class LoadBalanceTestCase extends junit.framework.TestCase {
+
+ public void testLoadBalance() throws ListenFailedException, UnknownHostException {
+ Slobrok slobrok = new Slobrok();
+ TestServer src = new TestServer("src", null, slobrok, null, null);
+ TestServer dst1 = new TestServer("dst/1", null, slobrok, null, null);
+ TestServer dst2 = new TestServer("dst/2", null, slobrok, null, null);
+ TestServer dst3 = new TestServer("dst/3", null, slobrok, null, null);
+
+ // set up handlers
+ final QueueAdapter sq = new QueueAdapter();
+ SourceSession ss = src.mb.createSourceSession(new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null)
+ .setReplyHandler(new ReplyHandler() {
+ @Override
+ public void handleReply(Reply reply) {
+ System.out.println(Thread.currentThread().getName() + ": Reply '" +
+ ((SimpleMessage)reply.getMessage()).getValue() + "' received at source.");
+ sq.handleReply(reply);
+ }
+ }));
+ SimpleDestination h1 = new SimpleDestination(dst1.mb, dst1.net.getIdentity());
+ SimpleDestination h2 = new SimpleDestination(dst2.mb, dst2.net.getIdentity());
+ SimpleDestination h3 = new SimpleDestination(dst3.mb, dst3.net.getIdentity());
+ assertTrue(src.waitSlobrok("dst/*/session", 3));
+
+ // send messages
+ int msgCnt = 30; // should be divisible by 3
+ for (int i = 0; i < msgCnt; ++i) {
+ ss.send(new SimpleMessage("msg" + i), Route.parse("dst/*/session"));
+ }
+
+ // wait for replies
+ assertTrue(sq.waitSize(msgCnt, 60));
+
+ // check handler message distribution
+ assertEquals(msgCnt / 3, h1.getCount());
+ assertEquals(msgCnt / 3, h2.getCount());
+ assertEquals(msgCnt / 3, h3.getCount());
+
+ ss.destroy();
+ h1.session.destroy();
+ h2.session.destroy();
+ h3.session.destroy();
+
+ dst3.destroy();
+ dst2.destroy();
+ dst1.destroy();
+ src.destroy();
+ slobrok.stop();
+ }
+
+ /**
+ * Implements a simple destination that counts and acknowledges all messages received.
+ */
+ private static class SimpleDestination implements MessageHandler {
+
+ final DestinationSession session;
+ final String ident;
+ int cnt = 0;
+
+ SimpleDestination(MessageBus mb, Identity ident) {
+ this.session = mb.createDestinationSession("session", true, this);
+ this.ident = ident.getServicePrefix();
+ }
+
+ @Override
+ public synchronized void handleMessage(Message msg) {
+ System.out.println(
+ Thread.currentThread().getName() + ": " +
+ "Message '" + ((SimpleMessage)msg).getValue() + "' received at '" + ident + "'.");
+ session.acknowledge(msg);
+ ++cnt;
+ }
+
+ public synchronized int getCount() {
+ return cnt;
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java
new file mode 100755
index 00000000000..e65621e6a10
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/OOSTestCase.java
@@ -0,0 +1,200 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.test.OOSServer;
+import com.yahoo.messagebus.network.rpc.test.OOSState;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class OOSTestCase extends junit.framework.TestCase {
+
+ private static class MyServer extends TestServer implements MessageHandler {
+ DestinationSession session;
+
+ public MyServer(String name, Slobrok slobrok, String oosServerPattern)
+ throws ListenFailedException, UnknownHostException
+ {
+ super(new MessageBusParams().setRetryPolicy(null).addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams()
+ .setIdentity(new Identity(name))
+ .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))
+ .setOOSServerPattern(oosServerPattern));
+ session = mb.createDestinationSession("session", true, this);
+ }
+
+ public boolean destroy() {
+ session.destroy();
+ return super.destroy();
+ }
+
+ public void handleMessage(Message msg) {
+ session.acknowledge(msg);
+ }
+ }
+
+ private static void assertError(SourceSession src, String dst, int error) {
+ Message msg = new SimpleMessage("msg");
+ msg.getTrace().setLevel(9);
+ assertTrue(src.send(msg, Route.parse(dst)).isAccepted());
+ Reply reply = ((Receptor) src.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ if (error == ErrorCode.NONE) {
+ assertFalse(reply.hasErrors());
+ } else {
+ assertTrue(reply.hasErrors());
+ assertEquals(error, reply.getError(0).getCode());
+ }
+ }
+
+ public void testOOS() throws ListenFailedException, UnknownHostException {
+ Slobrok slobrok = new Slobrok();
+ TestServer srcServer = new TestServer("src", null, slobrok, "oos/*", null);
+ SourceSession srcSession = srcServer.mb.createSourceSession(new Receptor());
+
+ MyServer dst1 = new MyServer("dst1", slobrok, null);
+ MyServer dst2 = new MyServer("dst2", slobrok, null);
+ MyServer dst3 = new MyServer("dst3", slobrok, null);
+ MyServer dst4 = new MyServer("dst4", slobrok, null);
+ MyServer dst5 = new MyServer("dst5", slobrok, null);
+ assertTrue(srcServer.waitSlobrok("*/session", 5));
+
+ // Ensure that normal sending is ok.
+ assertError(srcSession, "dst1/session", ErrorCode.NONE);
+ assertError(srcSession, "dst2/session", ErrorCode.NONE);
+ assertError(srcSession, "dst3/session", ErrorCode.NONE);
+ assertError(srcSession, "dst4/session", ErrorCode.NONE);
+ assertError(srcSession, "dst5/session", ErrorCode.NONE);
+
+ // Ensure that 2 OOS services report properly.
+ OOSServer oosServer = new OOSServer(slobrok, "oos/1", new OOSState()
+ .add("dst2/session", true)
+ .add("dst3/session", true));
+ assertTrue(srcServer.waitSlobrok("oos/*", 1));
+ assertTrue(srcServer.waitState(new OOSState()
+ .add("dst2/session", true)
+ .add("dst3/session", true)));
+ assertError(srcSession, "dst1/session", ErrorCode.NONE);
+ assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst3/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst4/session", ErrorCode.NONE);
+ assertError(srcSession, "dst5/session", ErrorCode.NONE);
+
+ // Ensure that 1 OOS service may come up while other stays down.
+ oosServer.setState(new OOSState().add("dst2/session", true));
+ assertTrue(srcServer.waitState(new OOSState()
+ .add("dst2/session", true)
+ .add("dst3/session", false)));
+ assertError(srcSession, "dst1/session", ErrorCode.NONE);
+ assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst3/session", ErrorCode.NONE);
+ assertError(srcSession, "dst4/session", ErrorCode.NONE);
+ assertError(srcSession, "dst5/session", ErrorCode.NONE);
+
+ // Add another OOS server and make sure that it works properly.
+ OOSServer oosServer2 = new OOSServer(slobrok, "oos/2", new OOSState()
+ .add("dst4/session", true)
+ .add("dst5/session", true));
+ assertTrue(srcServer.waitSlobrok("oos/*", 2));
+ assertTrue(srcServer.waitState(new OOSState()
+ .add("dst2/session", true)
+ .add("dst4/session", true)
+ .add("dst5/session", true)));
+ assertError(srcSession, "dst1/session", ErrorCode.NONE);
+ assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst3/session", ErrorCode.NONE);
+ assertError(srcSession, "dst4/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst5/session", ErrorCode.SERVICE_OOS);
+ oosServer2.shutdown();
+
+ // Ensure that shutting down one OOS server will properly propagate.
+ assertTrue(srcServer.waitSlobrok("oos/*", 1));
+ assertTrue(srcServer.waitState(new OOSState()
+ .add("dst1/session", false)
+ .add("dst2/session", true)
+ .add("dst3/session", false)
+ .add("dst4/session", false)
+ .add("dst5/session", false)));
+ assertError(srcSession, "dst1/session", ErrorCode.NONE);
+ assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst3/session", ErrorCode.NONE);
+ assertError(srcSession, "dst4/session", ErrorCode.NONE);
+ assertError(srcSession, "dst5/session", ErrorCode.NONE);
+
+ // Now add two new OOS servers and make sure that works too.
+ OOSServer oosServer3 = new OOSServer(slobrok, "oos/3", new OOSState()
+ .add("dst2/session", true)
+ .add("dst4/session", true));
+ OOSServer oosServer4 = new OOSServer(slobrok, "oos/4", new OOSState()
+ .add("dst2/session", true)
+ .add("dst3/session", true)
+ .add("dst5/session", true));
+ assertTrue(srcServer.waitSlobrok("oos/*", 3));
+ assertTrue(srcServer.waitState(new OOSState()
+ .add("dst2/session", true)
+ .add("dst3/session", true)
+ .add("dst4/session", true)
+ .add("dst5/session", true)));
+ assertError(srcSession, "dst1/session", ErrorCode.NONE);
+ assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst3/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst4/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst5/session", ErrorCode.SERVICE_OOS);
+
+ // Modify the state of the two new servers and make sure it propagates.
+ oosServer3.setState(new OOSState()
+ .add("dst2/session", true));
+ oosServer4.setState(new OOSState()
+ .add("dst1/session", true));
+ assertTrue(srcServer.waitState(new OOSState()
+ .add("dst1/session", true)
+ .add("dst2/session", true)
+ .add("dst3/session", false)
+ .add("dst4/session", false)
+ .add("dst5/session", false)));
+ assertError(srcSession, "dst1/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst3/session", ErrorCode.NONE);
+ assertError(srcSession, "dst4/session", ErrorCode.NONE);
+ assertError(srcSession, "dst5/session", ErrorCode.NONE);
+ oosServer3.shutdown();
+ oosServer4.shutdown();
+
+ // Ensure that shutting down the two latest OOS servers works properly.
+ assertTrue(srcServer.waitSlobrok("oos/*", 1));
+ assertTrue(srcServer.waitState(new OOSState()
+ .add("dst1/session", false)
+ .add("dst2/session", true)
+ .add("dst3/session", false)
+ .add("dst4/session", false)
+ .add("dst5/session", false)));
+ assertError(srcSession, "dst1/session", ErrorCode.NONE);
+ assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS);
+ assertError(srcSession, "dst3/session", ErrorCode.NONE);
+ assertError(srcSession, "dst4/session", ErrorCode.NONE);
+ assertError(srcSession, "dst5/session", ErrorCode.NONE);
+
+ dst2.destroy();
+ assertTrue(srcServer.waitSlobrok("*/session", 4));
+ assertError(srcSession, "dst2/session", ErrorCode.SERVICE_OOS);
+
+ srcSession.destroy();
+ dst1.destroy();
+ dst2.destroy();
+ dst3.destroy();
+ dst4.destroy();
+ dst5.destroy();
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/RPCNetworkTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/RPCNetworkTestCase.java
new file mode 100644
index 00000000000..b2927611248
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/RPCNetworkTestCase.java
@@ -0,0 +1,100 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.metrics.MetricSet;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingPolicy;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.text.Utf8String;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class RPCNetworkTestCase {
+
+ @Test
+ public void requireThatProtocolEncodeExceptionIsCaught() throws Exception {
+ RuntimeException e = new RuntimeException();
+
+ Slobrok slobrok = new Slobrok();
+ TestServer server = new TestServer(new MessageBusParams().addProtocol(MyProtocol.newEncodeException(e)),
+ new RPCNetworkParams().setSlobrokConfigId(slobrok.configId()));
+ Receptor receptor = new Receptor();
+ SourceSession src = server.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(receptor));
+ DestinationSession dst = server.mb.createDestinationSession(new DestinationSessionParams());
+ assertTrue(src.send(new MyMessage().setRoute(Route.parse(dst.getConnectionSpec()))).isAccepted());
+
+ Reply reply = receptor.getReply(60);
+ assertNotNull(reply);
+ assertEquals(1, reply.getNumErrors());
+
+ StringWriter expected = new StringWriter();
+ e.printStackTrace(new PrintWriter(expected));
+
+ String actual = reply.getError(0).toString();
+ assertTrue(actual, actual.contains(expected.toString()));
+ }
+
+ private static class MyMessage extends Message {
+
+ @Override
+ public Utf8String getProtocol() {
+ return new Utf8String(MyProtocol.NAME);
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+
+ private static class MyProtocol implements Protocol {
+
+ final static String NAME = "myProtocol";
+ final RuntimeException encodeException;
+
+ MyProtocol(RuntimeException encodeException) {
+ this.encodeException = encodeException;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public byte[] encode(Version version, Routable routable) {
+ throw encodeException;
+ }
+
+ @Override
+ public Routable decode(Version version, byte[] payload) {
+ return null;
+ }
+
+ @Override
+ public RoutingPolicy createPolicy(String name, String param) {
+ return null;
+ }
+
+ @Override
+ public MetricSet getMetrics() {
+ return null;
+ }
+
+ static MyProtocol newEncodeException(RuntimeException e) {
+ return new MyProtocol(e);
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java
new file mode 100755
index 00000000000..d296d7c7058
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java
@@ -0,0 +1,157 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SendAdapterTestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer srcServer, itrServer, dstServer;
+ SourceSession srcSession;
+ IntermediateSession itrSession;
+ DestinationSession dstSession;
+ TestProtocol srcProtocol, itrProtocol, dstProtocol;
+
+ @Before
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(
+ new MessageBusParams().addProtocol(dstProtocol = new TestProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(
+ new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ itrServer = new TestServer(
+ new MessageBusParams().addProtocol(itrProtocol = new TestProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("itr")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ itrSession = itrServer.mb.createIntermediateSession(
+ new IntermediateSessionParams().setName("session").setMessageHandler(new Receptor()).setReplyHandler(new Receptor()));
+ srcServer = new TestServer(
+ new MessageBusParams().addProtocol(srcProtocol = new TestProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("*/session", 2));
+ }
+
+ @After
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ itrSession.destroy();
+ itrServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ @Test
+ public void requireThatMessagesCanBeSentAcrossAllSupportedVersions() throws Exception {
+ List<Version> versions = Arrays.asList(new Version(5, 0), new Version(5, 1));
+ for (Version srcVersion : versions) {
+ for (Version itrVersion : versions) {
+ for (Version dstVersion : versions) {
+ assertVersionedSend(srcVersion, itrVersion, dstVersion);
+ }
+ }
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private void assertVersionedSend(Version srcVersion, Version itrVersion, Version dstVersion) {
+ System.out.println("Sending from " + srcVersion + " through " + itrVersion + " to " + dstVersion + ":");
+ srcServer.net.setVersion(srcVersion);
+ itrServer.net.setVersion(itrVersion);
+ dstServer.net.setVersion(dstVersion);
+
+ Message msg = new SimpleMessage("foo");
+ msg.getTrace().setLevel(9);
+ assertTrue(srcSession.send(msg, Route.parse("itr/session dst/session")).isAccepted());
+ assertNotNull(msg = ((Receptor)itrSession.getMessageHandler()).getMessage(300));
+ System.out.println("\tMessage version " + srcProtocol.lastVersion + " serialized at source.");
+ Version minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
+ assertEquals(minVersion, srcProtocol.lastVersion);
+
+ System.out.println("\tMessage version " + itrProtocol.lastVersion + " reached intermediate.");
+ assertEquals(minVersion, itrProtocol.lastVersion);
+ itrSession.forward(msg);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(300));
+ System.out.println("\tMessage version " + itrProtocol.lastVersion + " serialized at intermediate.");
+ minVersion = itrVersion.compareTo(dstVersion) < 0 ? itrVersion : dstVersion;
+ assertEquals(minVersion, itrProtocol.lastVersion);
+
+ System.out.println("\tMessage version " + dstProtocol.lastVersion + " reached destination.");
+ assertEquals(minVersion, dstProtocol.lastVersion);
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ dstSession.reply(reply);
+ assertNotNull(reply = ((Receptor)itrSession.getReplyHandler()).getReply(300));
+ System.out.println("\tReply version " + dstProtocol.lastVersion + " serialized at destination.");
+ assertEquals(minVersion, dstProtocol.lastVersion);
+
+ System.out.println("\tReply version " + itrProtocol.lastVersion + " reached intermediate.");
+ assertEquals(minVersion, itrProtocol.lastVersion);
+ itrSession.forward(reply);
+ assertNotNull(((Receptor)srcSession.getReplyHandler()).getReply(300));
+ System.out.println("\tReply version " + itrProtocol.lastVersion + " serialized at intermediate.");
+ minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
+ assertEquals(minVersion, itrProtocol.lastVersion);
+
+ System.out.println("\tReply version " + srcProtocol.lastVersion + " reached source.");
+ assertEquals(minVersion, srcProtocol.lastVersion);
+ }
+
+ private static class TestProtocol extends SimpleProtocol {
+
+ Version lastVersion;
+
+ @Override
+ public byte[] encode(Version version, Routable routable) {
+ lastVersion = version;
+ return super.encode(version, routable);
+ }
+
+ public Routable decode(Version version, byte[] payload) {
+ lastVersion = version;
+ return super.decode(version, payload);
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
new file mode 100755
index 00000000000..e69896e2bcf
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
@@ -0,0 +1,90 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ServiceAddressTestCase extends junit.framework.TestCase {
+
+ private Slobrok slobrok;
+ private RPCNetwork network;
+
+ public ServiceAddressTestCase(String msg) {
+ super(msg);
+ }
+
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ network = new RPCNetwork(new RPCNetworkParams()
+ .setIdentity(new Identity("foo"))
+ .setSlobrokConfigId("raw:slobrok[1]\nslobrok[0].connectionspec \"" +
+ new Spec("localhost", slobrok.port()).toString() + "\"\n"));
+ }
+
+ public void tearDown() {
+ network.shutdown();
+ slobrok.stop();
+ }
+
+ public void testAddrServiceAddress() {
+ assertNullAddress("tcp");
+ assertNullAddress("tcp/");
+ assertNullAddress("tcp/localhost");
+ assertNullAddress("tcp/localhost:");
+ assertNullAddress("tcp/localhost:1977");
+ assertNullAddress("tcp/localhost:1977/");
+ assertAddress("tcp/localhost:1977/session", "tcp/localhost:1977", "session");
+ assertNullAddress("tcp/localhost:/session");
+ //assertNullAddress("tcp/:1977/session");
+ assertNullAddress("tcp/:/session");
+ }
+
+ public void testNameServiceAddress() {
+ network.unregisterSession("session");
+ assertTrue(waitSlobrok("foo/session", 0));
+ assertNullAddress("foo/session");
+
+ network.registerSession("session");
+ assertTrue(waitSlobrok("foo/session", 1));
+ assertAddress("foo/session", network.getConnectionSpec(), "session");
+ }
+
+ private boolean waitSlobrok(String pattern, int num) {
+ for (int i = 0; i < 1000 && !Thread.currentThread().isInterrupted(); ++i) {
+ Mirror.Entry[] res = network.getMirror().lookup(pattern);
+ if (res.length == num) {
+ return true;
+ }
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ private void assertNullAddress(String pattern) {
+ assertNull(new RPCService(network.getMirror(), pattern).resolve());
+ }
+
+ private void assertAddress(String pattern, String expectedSpec, String expectedSession) {
+ RPCService service = new RPCService(network.getMirror(), pattern);
+ RPCServiceAddress obj = service.resolve();
+ assertNotNull(obj);
+ assertNotNull(obj.getConnectionSpec());
+ assertEquals(expectedSpec, obj.getConnectionSpec().toString());
+ if (expectedSession != null) {
+ assertEquals(expectedSession, obj.getSessionName());
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java
new file mode 100644
index 00000000000..42580b963a5
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java
@@ -0,0 +1,57 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import junit.framework.TestCase;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ServicePoolTestCase extends TestCase {
+
+ public void testMaxSize() throws ListenFailedException {
+ Slobrok slobrok = new Slobrok();
+ RPCNetwork net = new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ RPCServicePool pool = new RPCServicePool(net, 2);
+
+ pool.resolve("foo");
+ assertEquals(1, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(!pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("foo");
+ assertEquals(1, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(!pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("bar");
+ assertEquals(2, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("baz");
+ assertEquals(2, pool.getSize());
+ assertTrue(!pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(pool.hasService("baz"));
+
+ pool.resolve("bar");
+ assertEquals(2, pool.getSize());
+ assertTrue(!pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(pool.hasService("baz"));
+
+ pool.resolve("foo");
+ assertEquals(2, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ slobrok.stop();
+ }
+} \ No newline at end of file
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java
new file mode 100644
index 00000000000..3f502d4da2f
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SlobrokTestCase.java
@@ -0,0 +1,151 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ */
+public class SlobrokTestCase extends junit.framework.TestCase {
+
+ private static class Res {
+ private List<Mirror.Entry> lst = new ArrayList<>();
+ public Res add(String fullName, String spec) {
+ lst.add(new Mirror.Entry(fullName, spec));
+ return this;
+ }
+ public Mirror.Entry[] toArray() {
+ return lst.toArray(new Mirror.Entry[lst.size()]);
+ }
+ }
+
+
+ public SlobrokTestCase(String message) {
+ super(message);
+ }
+
+ Slobrok slobrok;
+ RPCNetwork net1;
+ RPCNetwork net2;
+ RPCNetwork net3;
+ int port1;
+ int port2;
+ int port3;
+
+ void check(RPCNetwork net, String pattern, Mirror.Entry[] expect) {
+ Comparator<Mirror.Entry> cmp = new Comparator<Mirror.Entry>() {
+ public int compare(Mirror.Entry a, Mirror.Entry b) {
+ return a.compareTo(b);
+ }
+ };
+ Arrays.sort(expect, cmp);
+ Mirror.Entry[] actual = null;
+ for (int i = 0; i < 1000; i++) {
+ actual = net.getMirror().lookup(pattern);
+ Arrays.sort(actual, cmp);
+ if (Arrays.equals(actual, expect)) {
+ System.out.printf("lookup successful for pattern: %s\n", pattern);
+ return;
+ }
+ try { Thread.sleep(10); } catch (InterruptedException e) {
+ //
+ }
+ }
+ System.out.printf("lookup failed for pattern: %s\n", pattern);
+ System.out.printf("actual values:\n");
+ if (actual == null || actual.length == 0) {
+ System.out.printf(" { EMPTY }\n");
+ } else {
+ for (Mirror.Entry entry : actual) {
+ System.out.printf(" { %s, %s }\n", entry.getName(), entry.getSpec());
+ }
+ }
+ System.out.printf("expected values:\n");
+ if (expect.length == 0) {
+ System.out.printf(" { EMPTY }\n");
+ } else {
+ for (Mirror.Entry entry : expect) {
+ System.out.printf(" { %s, %s }\n", entry.getName(), entry.getSpec());
+ }
+ }
+ assertTrue(false);
+ }
+
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ String slobrokCfgId = "raw:slobrok[1]\nslobrok[0].connectionspec \"" + new Spec("localhost", slobrok.port()).toString() + "\"\n";
+ net1 = new RPCNetwork(new RPCNetworkParams().setIdentity(new Identity("net/a")).setSlobrokConfigId(slobrokCfgId));
+ net2 = new RPCNetwork(new RPCNetworkParams().setIdentity(new Identity("net/b")).setSlobrokConfigId(slobrokCfgId));
+ net3 = new RPCNetwork(new RPCNetworkParams().setIdentity(new Identity("net/c")).setSlobrokConfigId(slobrokCfgId));
+ port1 = net1.getPort();
+ port2 = net2.getPort();
+ port3 = net3.getPort();
+ }
+
+ public void tearDown() {
+ net3.shutdown();
+ net2.shutdown();
+ net1.shutdown();
+ slobrok.stop();
+ }
+
+ public void testSlobrok() {
+ net1.registerSession("foo");
+ net2.registerSession("foo");
+ net2.registerSession("bar");
+ net3.registerSession("foo");
+ net3.registerSession("bar");
+ net3.registerSession("baz");
+
+ check(net1, "*/*/*", new Res()
+ .add("net/a/foo", net1.getConnectionSpec())
+ .add("net/b/foo", net2.getConnectionSpec())
+ .add("net/b/bar", net2.getConnectionSpec())
+ .add("net/c/foo", net3.getConnectionSpec())
+ .add("net/c/bar", net3.getConnectionSpec())
+ .add("net/c/baz", net3.getConnectionSpec()).toArray());
+ check(net2, "*/*/*", new Res()
+ .add("net/a/foo", net1.getConnectionSpec())
+ .add("net/b/foo", net2.getConnectionSpec())
+ .add("net/b/bar", net2.getConnectionSpec())
+ .add("net/c/foo", net3.getConnectionSpec())
+ .add("net/c/bar", net3.getConnectionSpec())
+ .add("net/c/baz", net3.getConnectionSpec()).toArray());
+ check(net3, "*/*/*", new Res()
+ .add("net/a/foo", net1.getConnectionSpec())
+ .add("net/b/foo", net2.getConnectionSpec())
+ .add("net/b/bar", net2.getConnectionSpec())
+ .add("net/c/foo", net3.getConnectionSpec())
+ .add("net/c/bar", net3.getConnectionSpec())
+ .add("net/c/baz", net3.getConnectionSpec()).toArray());
+
+ net2.unregisterSession("bar");
+ net3.unregisterSession("bar");
+ net3.unregisterSession("baz");
+
+ check(net1, "*/*/*", new Res()
+ .add("net/a/foo", net1.getConnectionSpec())
+ .add("net/b/foo", net2.getConnectionSpec())
+ .add("net/c/foo", net3.getConnectionSpec()).toArray());
+ check(net2, "*/*/*", new Res()
+ .add("net/a/foo", net1.getConnectionSpec())
+ .add("net/b/foo", net2.getConnectionSpec())
+ .add("net/c/foo", net3.getConnectionSpec()).toArray());
+ check(net3, "*/*/*", new Res()
+ .add("net/a/foo", net1.getConnectionSpec())
+ .add("net/b/foo", net2.getConnectionSpec())
+ .add("net/c/foo", net3.getConnectionSpec()).toArray());
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
new file mode 100755
index 00000000000..8d58b4dd89f
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
@@ -0,0 +1,112 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Transport;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.concurrent.Timer;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class TargetPoolTestCase extends junit.framework.TestCase {
+
+ private Slobrok slobrok;
+ private List<TestServer> servers;
+ private Supervisor orb;
+
+ @Override
+ public void setUp() throws ListenFailedException {
+ slobrok = new Slobrok();
+ servers = new ArrayList<>();
+ orb = new Supervisor(new Transport());
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ for (TestServer server : servers) {
+ server.destroy();
+ }
+ orb.transport().shutdown().join();
+ }
+
+ public void testConnectionExpire() throws ListenFailedException, UnknownHostException {
+ // Necessary setup to be able to resolve targets.
+ RPCServiceAddress adr1 = registerServer();
+ RPCServiceAddress adr2 = registerServer();
+ RPCServiceAddress adr3 = registerServer();
+
+ PoolTimer timer = new PoolTimer();
+ RPCTargetPool pool = new RPCTargetPool(timer, 0.666);
+
+ // Assert that all connections expire.
+ RPCTarget target;
+ assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ for (int i = 0; i < 10; ++i) {
+ pool.flushTargets(false);
+ assertEquals(3, pool.size());
+ }
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+
+ // Assert that only idle connections expire.
+ assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(3, pool.size());
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(2, pool.size());
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(1, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+
+ // Assert that connections never expire while they are referenced.
+ assertNotNull(target = pool.getTarget(orb, adr1));
+ assertEquals(1, pool.size());
+ for (int i = 0; i < 10; ++i) {
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(1, pool.size());
+ }
+ target.subRef();
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+ }
+
+ private RPCServiceAddress registerServer() throws ListenFailedException, UnknownHostException {
+ servers.add(new TestServer("srv" + servers.size(), null, slobrok, null, null));
+ return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec());
+ }
+
+ private static class PoolTimer implements Timer {
+ long millis = 0;
+
+ @Override
+ public long milliTime() {
+ return millis;
+ }
+ }
+} \ No newline at end of file
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/AdvancedRoutingTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/AdvancedRoutingTestCase.java
new file mode 100755
index 00000000000..62418c2766b
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/AdvancedRoutingTestCase.java
@@ -0,0 +1,119 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.test.CustomPolicyFactory;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class AdvancedRoutingTestCase extends junit.framework.TestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer srcServer, dstServer;
+ SourceSession srcSession;
+ DestinationSession dstFoo, dstBar, dstBaz;
+
+ @Override
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstFoo = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("foo").setMessageHandler(new Receptor()));
+ dstBar = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("bar").setMessageHandler(new Receptor()));
+ dstBaz = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("baz").setMessageHandler(new Receptor()));
+ srcServer = new TestServer(new MessageBusParams().setRetryPolicy(new RetryTransientErrorsPolicy().setBaseDelay(0)).addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("dst/*", 3));
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ dstFoo.destroy();
+ dstBar.destroy();
+ dstBaz.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public void testAdvanced() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory(false, ErrorCode.NO_ADDRESS_FOR_SERVICE));
+ srcServer.mb.putProtocol(protocol);
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addHop(new HopSpec("bar", "dst/bar"))
+ .addHop(new HopSpec("baz", "dst/baz"))
+ .addRoute(new RouteSpec("baz").addHop("baz")));
+ Message msg = new SimpleMessage("msg");
+ msg.getTrace().setLevel(9);
+ assertTrue(srcSession.send(msg, Route.parse("[Custom:" + dstFoo.getConnectionSpec() + ",bar,route:baz,dst/cox,?dst/unknown]")).isAccepted());
+
+ // Initial send.
+ assertNotNull(msg = ((Receptor)dstFoo.getMessageHandler()).getMessage(60));
+ dstFoo.acknowledge(msg);
+ assertNotNull(msg = ((Receptor)dstBar.getMessageHandler()).getMessage(60));
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.TRANSIENT_ERROR, "bar"));
+ dstBar.reply(reply);
+ assertNotNull(msg = ((Receptor)dstBaz.getMessageHandler()).getMessage(60));
+ reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.TRANSIENT_ERROR, "baz1"));
+ dstBaz.reply(reply);
+
+ // First retry.
+ assertNull(((Receptor)dstFoo.getMessageHandler()).getMessage(0));
+ assertNotNull(msg = ((Receptor)dstBar.getMessageHandler()).getMessage(60));
+ dstBar.acknowledge(msg);
+ assertNotNull(msg = ((Receptor)dstBaz.getMessageHandler()).getMessage(60));
+ reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.TRANSIENT_ERROR, "baz2"));
+ dstBaz.reply(reply);
+
+ // Second retry.
+ assertNull(((Receptor)dstFoo.getMessageHandler()).getMessage(0));
+ assertNull(((Receptor)dstBar.getMessageHandler()).getMessage(0));
+ assertNotNull(msg = ((Receptor)dstBaz.getMessageHandler()).getMessage(60));
+ reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.FATAL_ERROR, "baz3"));
+ dstBaz.reply(reply);
+
+ // Done.
+ reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(2, reply.getNumErrors());
+ assertEquals(ErrorCode.FATAL_ERROR, reply.getError(0).getCode());
+ assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(1).getCode());
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/ResenderTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/ResenderTestCase.java
new file mode 100755
index 00000000000..48993343f38
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/ResenderTestCase.java
@@ -0,0 +1,200 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ResenderTestCase extends junit.framework.TestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer srcServer, dstServer;
+ SourceSession srcSession;
+ DestinationSession dstSession;
+ RetryTransientErrorsPolicy retryPolicy;
+
+ @Override
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ retryPolicy = new RetryTransientErrorsPolicy();
+ retryPolicy.setBaseDelay(0);
+ srcServer = new TestServer(new MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public void testRetryTag() {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ for (int i = 0; i < 5; ++i) {
+ assertEquals(i, msg.getRetry());
+ assertEquals(true, msg.getRetryEnabled());
+ replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, 0);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ }
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+ assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0));
+ System.out.println(reply.getTrace());
+ }
+
+ public void testRetryEnabledTag() {
+ Message msg = createMessage("msg");
+ msg.setRetryEnabled(false);
+ assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted());
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ assertEquals(false, msg.getRetryEnabled());
+ replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, 0);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertTrue(reply.hasErrors());
+ assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0));
+ System.out.println(reply.getTrace());
+ }
+
+ public void testTransientError() {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, 0);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ replyFromDestination(msg, ErrorCode.APP_FATAL_ERROR, 0);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertTrue(reply.hasFatalErrors());
+ assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0));
+ }
+
+ public void testFatalError() {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ replyFromDestination(msg, ErrorCode.APP_FATAL_ERROR, 0);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertTrue(reply.hasFatalErrors());
+ assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0));
+ }
+
+ public void testDisableRetry() {
+ retryPolicy.setEnabled(false);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, 0);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertTrue(reply.hasErrors());
+ assertTrue(!reply.hasFatalErrors());
+ assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0));
+ }
+
+ public void testRetryDelay() {
+ retryPolicy.setBaseDelay(0.01);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ for (int i = 0; i < 5; ++i) {
+ replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, -1);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ }
+ replyFromDestination(msg, ErrorCode.APP_FATAL_ERROR, 0);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertTrue(reply.hasFatalErrors());
+ assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0));
+
+ String trace = reply.getTrace().toString();
+ assertTrue(trace.contains("retry 1 in 0.01"));
+ assertTrue(trace.contains("retry 2 in 0.02"));
+ assertTrue(trace.contains("retry 3 in 0.03"));
+ assertTrue(trace.contains("retry 4 in 0.04"));
+ assertTrue(trace.contains("retry 5 in 0.05"));
+ }
+
+ public void testRequestRetryDelay() {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ for (int i = 0; i < 5; ++i) {
+ replyFromDestination(msg, ErrorCode.APP_TRANSIENT_ERROR, i / 50.0);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ }
+ replyFromDestination(msg, ErrorCode.APP_FATAL_ERROR, 0);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ assertTrue(reply.hasFatalErrors());
+ assertNull(((Receptor)dstSession.getMessageHandler()).getMessage(0));
+
+ String trace = reply.getTrace().toString();
+ System.out.println(trace);
+ assertTrue(trace.contains("retry 1 in 0"));
+ assertTrue(trace.contains("retry 2 in 0.02"));
+ assertTrue(trace.contains("retry 3 in 0.04"));
+ assertTrue(trace.contains("retry 4 in 0.06"));
+ assertTrue(trace.contains("retry 5 in 0.08"));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private static Message createMessage(String msg) {
+ SimpleMessage ret = new SimpleMessage(msg);
+ ret.getTrace().setLevel(9);
+ return ret;
+ }
+
+ private void replyFromDestination(Message msg, int errorCode, double retryDelay) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ if (errorCode != ErrorCode.NONE) {
+ reply.addError(new Error(errorCode, "err"));
+ }
+ reply.setRetryDelay(retryDelay);
+ dstSession.reply(reply);
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java
new file mode 100644
index 00000000000..ad13f3325c7
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java
@@ -0,0 +1,32 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.messagebus.ErrorCode;
+import junit.framework.TestCase;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RetryPolicyTestCase extends TestCase {
+
+ public void testSimpleRetryPolicy() {
+ RetryTransientErrorsPolicy policy = new RetryTransientErrorsPolicy();
+ for (int i = 0; i < 5; ++i) {
+ double delay = i / 3.0;
+ policy.setBaseDelay(delay);
+ for (int j = 0; j < 5; ++j) {
+ assertEquals((int)(j * delay), (int)policy.getRetryDelay(j));
+ }
+ for (int j = ErrorCode.NONE; j < ErrorCode.ERROR_LIMIT; ++j) {
+ policy.setEnabled(true);
+ if (j < ErrorCode.FATAL_ERROR) {
+ assertTrue(policy.canRetry(j));
+ } else {
+ assertFalse(policy.canRetry(j));
+ }
+ policy.setEnabled(false);
+ assertFalse(policy.canRetry(j));
+ }
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RouteParserTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RouteParserTestCase.java
new file mode 100755
index 00000000000..e4d120271bc
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RouteParserTestCase.java
@@ -0,0 +1,168 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.routing;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RouteParserTestCase extends junit.framework.TestCase {
+
+ public void testHopParser() {
+ Hop hop = Hop.parse("foo");
+ assertNotNull(hop);
+ assertEquals(1, hop.getNumDirectives());
+ assertVerbatimDirective(hop.getDirective(0), "foo");
+
+ assertNotNull(hop = Hop.parse("foo/bar"));
+ assertEquals(2, hop.getNumDirectives());
+ assertVerbatimDirective(hop.getDirective(0), "foo");
+ assertVerbatimDirective(hop.getDirective(1), "bar");
+
+ assertNotNull(hop = Hop.parse("tcp/foo:666/bar"));
+ assertEquals(1, hop.getNumDirectives());
+ assertTcpDirective(hop.getDirective(0), "foo", 666, "bar");
+
+ assertNotNull(hop = Hop.parse("route:foo"));
+ assertEquals(1, hop.getNumDirectives());
+ assertRouteDirective(hop.getDirective(0), "foo");
+
+ assertNotNull(hop = Hop.parse("[Extern:tcp/localhost:3619;foo/bar]"));
+ assertEquals(1, hop.getNumDirectives());
+ assertPolicyDirective(hop.getDirective(0), "Extern","tcp/localhost:3619;foo/bar");
+
+ assertNotNull(hop = Hop.parse("[AND:foo bar]"));
+ assertEquals(1, hop.getNumDirectives());
+ assertPolicyDirective(hop.getDirective(0), "AND","foo bar");
+
+ assertNotNull(hop = Hop.parse("[DocumentRouteSelector:raw:route[2]\n" +
+ "route[0].name \"foo\"\n" +
+ "route[0].selector \"testdoc\"\n" +
+ "route[0].feed \"myfeed\"\n" +
+ "route[1].name \"bar\"\n" +
+ "route[1].selector \"other\"\n" +
+ "route[1].feed \"myfeed\"\n" +
+ "]"));
+ assertEquals(1, hop.getNumDirectives());
+ assertPolicyDirective(hop.getDirective(0), "DocumentRouteSelector",
+ "raw:route[2]\n" +
+ "route[0].name \"foo\"\n" +
+ "route[0].selector \"testdoc\"\n" +
+ "route[0].feed \"myfeed\"\n" +
+ "route[1].name \"bar\"\n" +
+ "route[1].selector \"other\"\n" +
+ "route[1].feed \"myfeed\"");
+
+ assertNotNull(hop = Hop.parse("[DocumentRouteSelector:raw:route[1]\n" +
+ "route[0].name \"docproc/cluster.foo\"\n" +
+ "route[0].selector \"testdoc\"\n" +
+ "route[0].feed \"myfeed\"" +
+ "]"));
+ assertEquals(1, hop.getNumDirectives());
+ assertPolicyDirective(hop.getDirective(0), "DocumentRouteSelector",
+ "raw:route[1]\n" +
+ "route[0].name \"docproc/cluster.foo\"\n" +
+ "route[0].selector \"testdoc\"\n" +
+ "route[0].feed \"myfeed\"");
+ }
+
+ public void testHopParserErrors() {
+ assertError(Hop.parse(""), "Failed to parse empty string.");
+ assertError(Hop.parse("[foo"), "Unterminated '[' in '[foo'");
+ assertError(Hop.parse("foo/[bar]]"), "Unexpected token ']' in 'foo/[bar]]'");
+ assertError(Hop.parse("foo bar"), "Failed to completely parse 'foo bar'.");
+ }
+
+ public void testShortRoute() {
+ Route shortRoute = Route.parse("c");
+ assertNotNull(shortRoute);
+ assertEquals(1, shortRoute.getNumHops());
+ Hop hop = shortRoute.getHop(0);
+ assertNotNull(hop);
+ assertEquals(1, hop.getNumDirectives());
+ assertVerbatimDirective(hop.getDirective(0), "c");
+ }
+
+ public void testShortHops() {
+ Route shortRoute = Route.parse("a b c");
+ assertNotNull(shortRoute);
+ assertEquals(3, shortRoute.getNumHops());
+ Hop hop = shortRoute.getHop(0);
+ assertNotNull(hop);
+ assertEquals(1, hop.getNumDirectives());
+ assertVerbatimDirective(hop.getDirective(0), "a");
+ }
+
+ public void testRouteParser() {
+ Route route = Route.parse("foo bar/baz");
+ assertNotNull(route);
+ assertEquals(2, route.getNumHops());
+ Hop hop = route.getHop(0);
+ assertNotNull(hop);
+ assertEquals(1, hop.getNumDirectives());
+ assertVerbatimDirective(hop.getDirective(0), "foo");
+ assertNotNull(hop = route.getHop(1));
+ assertEquals(2, hop.getNumDirectives());
+ assertVerbatimDirective(hop.getDirective(0), "bar");
+ assertVerbatimDirective(hop.getDirective(1), "baz");
+
+ assertNotNull(route = Route.parse("[Extern:tcp/localhost:3633;itr/session] default"));
+ assertEquals(2, route.getNumHops());
+ assertNotNull(hop = route.getHop(0));
+ assertEquals(1, hop.getNumDirectives());
+ assertPolicyDirective(hop.getDirective(0), "Extern", "tcp/localhost:3633;itr/session");
+ assertNotNull(hop = route.getHop(1));
+ assertEquals(1, hop.getNumDirectives());
+ assertVerbatimDirective(hop.getDirective(0), "default");
+ }
+
+ public void testRouteParserErrors() {
+ assertError(Route.parse(""), "Failed to parse empty string.");
+ assertError(Route.parse("foo [bar"), "Unterminated '[' in '[bar'");
+ assertError(Route.parse("foo bar/[baz]]"), "Unexpected token ']' in 'bar/[baz]]'");
+ }
+
+ private static void assertError(Route route, String msg) {
+ assertNotNull(route);
+ assertEquals(1, route.getNumHops());
+ assertError(route.getHop(0), msg);
+ }
+
+ private static void assertError(Hop hop, String msg) {
+ assertNotNull(hop);
+ System.out.println(hop.toDebugString());
+ assertEquals(1, hop.getNumDirectives());
+ assertErrorDirective(hop.getDirective(0), msg);
+ }
+
+ private static void assertErrorDirective(HopDirective dir, String msg) {
+ assertNotNull(dir);
+ assertTrue(dir instanceof ErrorDirective);
+ assertEquals(msg, ((ErrorDirective)dir).getMessage());
+ }
+
+ private static void assertPolicyDirective(HopDirective dir, String name, String param) {
+ assertNotNull(dir);
+ assertTrue(dir instanceof PolicyDirective);
+ assertEquals(name, ((PolicyDirective)dir).getName());
+ assertEquals(param, ((PolicyDirective)dir).getParam());
+ }
+
+ private static void assertRouteDirective(HopDirective dir, String name) {
+ assertNotNull(dir);
+ assertTrue(dir instanceof RouteDirective);
+ assertEquals(name, ((RouteDirective)dir).getName());
+ }
+
+ private static void assertTcpDirective(HopDirective dir, String host, int port, String session) {
+ assertNotNull(dir);
+ assertTrue(dir instanceof TcpDirective);
+ assertEquals(host, ((TcpDirective)dir).getHost());
+ assertEquals(port, ((TcpDirective)dir).getPort());
+ assertEquals(session, ((TcpDirective)dir).getSession());
+ }
+
+ private static void assertVerbatimDirective(HopDirective dir, String image) {
+ assertNotNull(dir);
+ assertTrue(dir instanceof VerbatimDirective);
+ assertEquals(image, ((VerbatimDirective)dir).getImage());
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingContextTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingContextTestCase.java
new file mode 100755
index 00000000000..ea217af5b9a
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingContextTestCase.java
@@ -0,0 +1,258 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RoutingContextTestCase extends junit.framework.TestCase {
+ public static final int TIMEOUT_SECS = 120;
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer srcServer, dstServer;
+ SourceSession srcSession;
+ DestinationSession dstSession;
+
+ @Override
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ srcServer = new TestServer(new MessageBusParams().setRetryPolicy(new RetryTransientErrorsPolicy().setBaseDelay(0)).addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public void testSingleDirective() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory(
+ false,
+ Arrays.asList("foo", "bar", "baz/cox"),
+ Arrays.asList("foo", "bar")));
+ srcServer.mb.putProtocol(protocol);
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addRoute(new RouteSpec("myroute").addHop("myhop"))
+ .addHop(new HopSpec("myhop", "[Custom]")
+ .addRecipient("foo").addRecipient("bar").addRecipient("baz/cox")));
+ for (int i = 0; i < 2; ++i) {
+ assertTrue(srcSession.send(createMessage("msg"), "myroute").isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(TIMEOUT_SECS);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+ }
+
+ public void testMoreDirectives() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory(
+ false,
+ Arrays.asList("foo", "foo/bar", "foo/bar0/baz", "foo/bar1/baz", "foo/bar/baz/cox"),
+ Arrays.asList("foo/bar0/baz", "foo/bar1/baz")));
+ srcServer.mb.putProtocol(protocol);
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addRoute(new RouteSpec("myroute").addHop("myhop"))
+ .addHop(new HopSpec("myhop", "foo/[Custom]/baz")
+ .addRecipient("foo").addRecipient("foo/bar")
+ .addRecipient("foo/bar0/baz").addRecipient("foo/bar1/baz")
+ .addRecipient("foo/bar/baz/cox")));
+ for (int i = 0; i < 2; ++i) {
+ assertTrue(srcSession.send(createMessage("msg"), "myroute").isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(TIMEOUT_SECS);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+ }
+
+ public void testRecipientsRemain() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("First", new CustomPolicyFactory(true, Arrays.asList("foo/bar"), Arrays.asList("foo/[Second]")));
+ protocol.addPolicyFactory("Second", new CustomPolicyFactory(false, Arrays.asList("foo/bar"), Arrays.asList("foo/bar")));
+ srcServer.mb.putProtocol(protocol);
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addRoute(new RouteSpec("myroute").addHop("myhop"))
+ .addHop(new HopSpec("myhop", "[First]/[Second]")
+ .addRecipient("foo/bar")));
+ for (int i = 0; i < 2; ++i) {
+ assertTrue(srcSession.send(createMessage("msg"), "myroute").isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(TIMEOUT_SECS);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+ }
+
+ public void testConstRoute() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("DocumentRouteSelector",
+ new CustomPolicyFactory(true, Arrays.asList("dst"), Arrays.asList("dst")));
+ srcServer.mb.putProtocol(protocol);
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addRoute(new RouteSpec("default").addHop("indexing"))
+ .addHop(new HopSpec("indexing", "[DocumentRouteSelector]").addRecipient("dst"))
+ .addHop(new HopSpec("dst", "dst/session")));
+ for (int i = 0; i < 2; ++i) {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("route:default")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(TIMEOUT_SECS);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(TIMEOUT_SECS);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private Message createMessage(String msg) {
+ Message ret = new SimpleMessage(msg);
+ ret.getTrace().setLevel(9);
+ return ret;
+ }
+
+ private static class CustomPolicyFactory implements SimpleProtocol.PolicyFactory {
+
+ final boolean forward;
+ final List<String> expectedAll;
+ final List<String> expectedMatched;
+
+ public CustomPolicyFactory(boolean forward, List<String> all, List<String> matched) {
+ this.forward = forward;
+ this.expectedAll = all;
+ this.expectedMatched = matched;
+ }
+
+ public RoutingPolicy create(String param) {
+ return new CustomPolicy(this);
+ }
+ }
+
+ private static class CustomPolicy implements RoutingPolicy {
+
+ CustomPolicyFactory factory;
+
+ public CustomPolicy(CustomPolicyFactory factory) {
+ this.factory = factory;
+ }
+
+ public void select(RoutingContext ctx) {
+ Reply reply = new EmptyReply();
+ reply.getTrace().setLevel(9);
+
+ List<Route> recipients = ctx.getAllRecipients();
+ if (factory.expectedAll.size() == recipients.size()) {
+ ctx.trace(1, "Got " + recipients.size() + " expected recipients.");
+ for (Route route : recipients) {
+ if (factory.expectedAll.contains(route.toString())) {
+ ctx.trace(1, "Got expected recipient '" + route + "'.");
+ } else {
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR,
+ "Recipient '" + route + "' not expected."));
+ }
+ }
+ } else {
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR,
+ "Expected " + factory.expectedAll.size() + " recipients, got " + recipients.size() + "."));
+ }
+
+ if (ctx.getNumRecipients() == recipients.size()) {
+ for (int i = 0; i < recipients.size(); ++i) {
+ if (recipients.get(i) == ctx.getRecipient(i)) {
+ ctx.trace(1, "getRecipient(" + i + ") matches getAllRecipients().get(" + i + ")");
+ } else {
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR,
+ "getRecipient(" + i + ") differs from getAllRecipients().get(" + i + ")"));
+ }
+ }
+ } else {
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR,
+ "getNumRecipients() differs from getAllRecipients().size()"));
+ }
+
+ recipients = ctx.getMatchedRecipients();
+ if (factory.expectedMatched.size() == recipients.size()) {
+ ctx.trace(1, "Got " + recipients.size() + " matched recipients.");
+ for (Route route : recipients) {
+ if (factory.expectedMatched.contains(route.toString())) {
+ ctx.trace(1, "Got matched recipient '" + route + "'.");
+ } else {
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR,
+ "Matched recipient '" + route + "' not expected."));
+ }
+ }
+ } else {
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR,
+ "Expected " + factory.expectedAll.size() + " matched recipients, got " + recipients.size() + "."));
+ }
+
+ if (!reply.hasErrors() && factory.forward) {
+ for (Route route : recipients) {
+ ctx.addChild(route);
+ }
+ } else {
+ ctx.setReply(reply);
+ }
+ }
+
+ public void merge(RoutingContext ctx) {
+ Reply ret = new EmptyReply();
+ for (RoutingNodeIterator it = ctx.getChildIterator();
+ it.isValid(); it.next()) {
+ Reply reply = it.getReplyRef();
+ for (int i = 0; i < reply.getNumErrors(); ++i) {
+ ret.addError(reply.getError(i));
+ }
+ }
+ ctx.setReply(ret);
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java
new file mode 100755
index 00000000000..3cbb9d86e44
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java
@@ -0,0 +1,336 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.messagebus.ConfigAgent;
+import com.yahoo.messagebus.ConfigHandler;
+import junit.framework.TestCase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RoutingSpecTestCase extends TestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public void testConfig() {
+ assertConfig(new RoutingSpec());
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
+ .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
+ .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
+ .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
+ .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2")))
+ .addTable(new RoutingTableSpec("mytable2")));
+ assertEquals("routingtable[2]\n" +
+ "routingtable[0].protocol \"mytable1\"\n" +
+ "routingtable[1].protocol \"mytable2\"\n" +
+ "routingtable[1].hop[3]\n" +
+ "routingtable[1].hop[0].name \"myhop1\"\n" +
+ "routingtable[1].hop[0].selector \"myselector1\"\n" +
+ "routingtable[1].hop[1].name \"myhop2\"\n" +
+ "routingtable[1].hop[1].selector \"myselector2\"\n" +
+ "routingtable[1].hop[1].ignoreresult true\n" +
+ "routingtable[1].hop[2].name \"myhop1\"\n" +
+ "routingtable[1].hop[2].selector \"myselector3\"\n" +
+ "routingtable[1].hop[2].recipient[2]\n" +
+ "routingtable[1].hop[2].recipient[0] \"myrecipient1\"\n" +
+ "routingtable[1].hop[2].recipient[1] \"myrecipient2\"\n" +
+ "routingtable[1].route[1]\n" +
+ "routingtable[1].route[0].name \"myroute1\"\n" +
+ "routingtable[1].route[0].hop[1]\n" +
+ "routingtable[1].route[0].hop[0] \"myhop1\"\n",
+ new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable1"))
+ .addTable(new RoutingTableSpec("mytable2")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2").setIgnoreResult(true))
+ .addHop(new HopSpec("myhop1", "myselector3")
+ .addRecipient("myrecipient1")
+ .addRecipient("myrecipient2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))).toString());
+ }
+
+ public void testApplicationSpec() {
+ assertApplicationSpec(Arrays.asList("foo"),
+ Arrays.asList("foo",
+ "*"));
+ assertApplicationSpec(Arrays.asList("foo/bar"),
+ Arrays.asList("foo/bar",
+ "foo/*",
+ "*/bar",
+ "*/*"));
+ assertApplicationSpec(Arrays.asList("foo/0/baz",
+ "foo/1/baz",
+ "foo/2/baz"),
+ Arrays.asList("foo/0/baz",
+ "foo/1/baz",
+ "foo/2/baz",
+ "foo/0/*",
+ "foo/1/*",
+ "foo/2/*",
+ "foo/*/baz",
+ "*/0/baz",
+ "*/1/baz",
+ "*/2/baz",
+ "foo/*/*",
+ "*/0/*",
+ "*/1/*",
+ "*/2/*",
+ "*/*/baz",
+ "*/*/*"));
+ }
+
+ public void testVeriyfOk() {
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "myservice1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("route1").addHop("myservice1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "myservice1"))
+ .addRoute(new RouteSpec("route1").addHop("hop1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "route:route2"))
+ .addHop(new HopSpec("hop2", "myservice1"))
+ .addRoute(new RouteSpec("route1").addHop("hop1"))
+ .addRoute(new RouteSpec("route2").addHop("hop2"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("myhop1", "foo/[bar]/baz").addRecipient("foo/0/baz").addRecipient("foo/1/baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "foo/0/baz")
+ .addService("mytable", "foo/1/baz"));
+ }
+
+ public void testVerifyToggle() {
+ assertVerifyOk(new RoutingSpec(false)
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable", false)
+ .addHop(new HopSpec("foo", "bar"))
+ .addHop(new HopSpec("foo", "baz"))),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "", false))),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo", false))),
+ new ApplicationSpec());
+ }
+
+ public void testVerifyFail() {
+ // Duplicate table.
+ assertVerifyFail(new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")),
+ new ApplicationSpec(),
+ Arrays.asList("Routing table 'mytable' is defined 2 times."));
+
+ // Duplicate hop.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar"))
+ .addHop(new HopSpec("foo", "baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' is defined 2 times."));
+
+ // Duplicate route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar"))
+ .addRoute(new RouteSpec("foo").addHop("baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Route 'foo' in routing table 'mytable' is defined 2 times."));
+
+ // Empty hop.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", ""))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to parse empty string."));
+
+ // Empty route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo"))),
+ new ApplicationSpec(),
+ Arrays.asList("Route 'foo' in routing table 'mytable' has no hops."));
+
+ // Hop error.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop error in recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "[bar]").addRecipient("bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For recipient 'bar/baz cox' in hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop error in route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 1 in route 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references 'bar' which is neither a service, a route nor another hop."));
+
+ // Mismatched recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/[baz]/cox").addRecipient("cox/0/bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'foo' in routing table 'mytable'."));
+
+ // Route not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "route:bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
+
+ // Route not found in route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("route:bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
+
+ // Service not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/baz"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop."));
+
+ // Unexpected recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar").addRecipient("baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' has recipients but no policy directive."));
+
+ // Multiple errors.
+ assertVerifyFail(new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "bar"))
+ .addHop(new HopSpec("hop1", "baz"))
+ .addHop(new HopSpec("hop2", ""))
+ .addHop(new HopSpec("hop3", "bar/baz cox"))
+ .addHop(new HopSpec("hop4", "[bar]").addRecipient("bar/baz cox"))
+ .addHop(new HopSpec("hop5", "bar/[baz]/cox").addRecipient("cox/0/bar"))
+ .addHop(new HopSpec("hop6", "route:route69"))
+ .addHop(new HopSpec("hop7", "bar/baz"))
+ .addHop(new HopSpec("hop8", "bar").addRecipient("baz"))
+ .addRoute(new RouteSpec("route1").addHop("bar"))
+ .addRoute(new RouteSpec("route1").addHop("baz"))
+ .addRoute(new RouteSpec("route2").addHop(""))
+ .addRoute(new RouteSpec("route3").addHop("bar/baz cox"))
+ .addRoute(new RouteSpec("route4").addHop("hop69"))
+ .addRoute(new RouteSpec("route5").addHop("route:route69"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Routing table 'mytable' is defined 2 times.",
+ "For hop 'hop2' in routing table 'mytable'; Failed to parse empty string.",
+ "For hop 'hop3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "For hop 1 in route 'route2' in routing table 'mytable'; Failed to parse empty string.",
+ "For hop 1 in route 'route3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "For recipient 'bar/baz cox' in hop 'hop4' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "Hop 'hop1' in routing table 'mytable' is defined 2 times.",
+ "Hop 'hop6' in routing table 'mytable' references route 'route69' which does not exist.",
+ "Hop 'hop7' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop.",
+ "Hop 'hop8' in routing table 'mytable' has recipients but no policy directive.",
+ "Hop 1 in route 'route4' in routing table 'mytable' references 'hop69' which is neither a service, a route nor another hop.",
+ "Hop 1 in route 'route5' in routing table 'mytable' references route 'route69' which does not exist.",
+ "Route 'route1' in routing table 'mytable' is defined 2 times.",
+ "Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'hop5' in routing table 'mytable'."));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private static void assertVerifyOk(RoutingSpec routing, ApplicationSpec app) {
+ assertVerifyFail(routing, app, new ArrayList<String>());
+ }
+
+ private static void assertVerifyFail(RoutingSpec routing, ApplicationSpec app, List<String> expectedErrors) {
+ List<String> errors = new ArrayList<>();
+ routing.verify(app, errors);
+
+ Collections.sort(errors);
+ Collections.sort(expectedErrors);
+ assertEquals(expectedErrors.toString(), errors.toString());
+ }
+
+ private static void assertConfig(RoutingSpec routing) {
+ assertEquals(routing, routing);
+ assertEquals(routing, new RoutingSpec(routing));
+
+ ConfigStore store = new ConfigStore();
+ ConfigAgent subscriber = new ConfigAgent("raw:" + routing.toString(), store);
+ subscriber.subscribe();
+ assertTrue(store.routing.equals(routing));
+ }
+
+ private static void assertApplicationSpec(List<String> services, List<String> patterns) {
+ ApplicationSpec app = new ApplicationSpec();
+ for (String pattern : patterns) {
+ assertFalse(app.isService("foo", pattern));
+ assertFalse(app.isService("bar", pattern));
+ }
+ for (String service : services) {
+ app.addService("foo", service);
+ }
+ for (String pattern : patterns) {
+ assertTrue(app.isService("foo", pattern));
+ assertFalse(app.isService("bar", pattern));
+ }
+ for (String service : services) {
+ app.addService("bar", service);
+ }
+ for (String pattern : patterns) {
+ assertTrue(app.isService("foo", pattern));
+ assertTrue(app.isService("bar", pattern));
+ }
+ }
+
+ private static class ConfigStore implements ConfigHandler {
+
+ RoutingSpec routing = null;
+
+ public void setupRouting(RoutingSpec routing) {
+ this.routing = routing;
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingTestCase.java
new file mode 100644
index 00000000000..911350d3cc9
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingTestCase.java
@@ -0,0 +1,1144 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.component.Vtag;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.test.CustomPolicy;
+import com.yahoo.messagebus.routing.test.CustomPolicyFactory;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RoutingTestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer srcServer, dstServer;
+ SourceSession srcSession;
+ DestinationSession dstSession;
+ RetryTransientErrorsPolicy retryPolicy;
+
+ @Before
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(
+ TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(
+ new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ retryPolicy = new RetryTransientErrorsPolicy();
+ retryPolicy.setBaseDelay(0);
+ srcServer = new TestServer(new MessageBusParams().setRetryPolicy(retryPolicy).addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+ }
+
+ @After
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ @Test
+ public void requireThatNullRouteIsCaught() {
+ assertTrue(srcSession.send(createMessage("msg")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode());
+ }
+
+ @Test
+ public void requireThatEmptyRouteIsCaught() {
+ assertTrue(srcSession.send(createMessage("msg"), new Route()).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode());
+ }
+
+ @Test
+ public void requireThatHopNameIsExpanded() {
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addHop(new HopSpec("dst", "dst/session")));
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+
+ @Test
+ public void requireThatRouteDirectiveWorks() {
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addRoute(new RouteSpec("dst").addHop("dst/session"))
+ .addHop(new HopSpec("dir", "route:dst")));
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dir")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+
+ @Test
+ public void requireThatRouteNameIsExpanded() {
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addRoute(new RouteSpec("dst").addHop("dst/session")));
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+
+ @Test
+ public void requireThatHopResolutionOverflowIsCaught() {
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addHop(new HopSpec("foo", "bar"))
+ .addHop(new HopSpec("bar", "foo")));
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("foo")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode());
+ }
+
+ @Test
+ public void requireThatRouteResolutionOverflowIsCaught() {
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addRoute(new RouteSpec("foo").addHop("route:foo")));
+ assertTrue(srcSession.send(createMessage("msg"), "foo").isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode());
+ }
+
+ @Test
+ public void requireThatRouteExpansionOnlyReplacesFirstHop() {
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addRoute(new RouteSpec("foo").addHop("dst/session").addHop("bar")));
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("route:foo baz")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ assertEquals(2, msg.getRoute().getNumHops());
+ assertEquals("bar", msg.getRoute().getHop(0).toString());
+ assertEquals("baz", msg.getRoute().getHop(1).toString());
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+
+ @Test
+ public void requireThatErrorDirectiveWorks() {
+ Route route = Route.parse("foo/bar/baz");
+ route.getHop(0).setDirective(1, new ErrorDirective("err"));
+ assertTrue(srcSession.send(createMessage("msg"), route).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.ILLEGAL_ROUTE, reply.getError(0).getCode());
+ assertEquals("err", reply.getError(0).getMessage());
+ }
+
+ @Test
+ public void requireThatIllegalSelectIsCaught() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory());
+ srcServer.mb.putProtocol(protocol);
+ Route route = Route.parse("[Custom: ]");
+ assertNotNull(route);
+ assertTrue(srcSession.send(createMessage("msg"), route).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.NO_SERVICES_FOR_ROUTE, reply.getError(0).getCode());
+ }
+
+ @Test
+ public void requireThatEmptySelectIsCaught() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory());
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom]")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.NO_SERVICES_FOR_ROUTE, reply.getError(0).getCode());
+ }
+
+ @Test
+ public void requireThatPolicySelectWorks() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory());
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+
+ @Test
+ public void requireThatTransientErrorsAreRetried() {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err1"));
+ dstSession.reply(reply);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err2"));
+ dstSession.reply(reply);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ dstSession.acknowledge(msg);
+ assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60));
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ assertTrace(Arrays.asList("[APP_TRANSIENT_ERROR @ localhost]: err1",
+ "-[APP_TRANSIENT_ERROR @ localhost]: err1",
+ "[APP_TRANSIENT_ERROR @ localhost]: err2",
+ "-[APP_TRANSIENT_ERROR @ localhost]: err2"),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatTransientErrorsAreRetriedWithPolicy() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory());
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err1"));
+ dstSession.reply(reply);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err2"));
+ dstSession.reply(reply);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ dstSession.acknowledge(msg);
+ assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60));
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ assertTrace(Arrays.asList("Source session accepted a 3 byte message. 1 message(s) now pending.",
+ "Running routing policy 'Custom'.",
+ "Selecting [dst/session].",
+ "Component 'dst/session' selected by policy 'Custom'.",
+ "Resolving 'dst/session'.",
+ "Sending message (version ${VERSION}) from client to 'dst/session'",
+ "Message (type 1) received at 'dst' for session 'session'.",
+ "[APP_TRANSIENT_ERROR @ localhost]: err1",
+ "Sending reply (version ${VERSION}) from 'dst'.",
+ "Reply (type 0) received at client.",
+ "Routing policy 'Custom' merging replies.",
+ "Merged [dst/session].",
+ "Message scheduled for retry 1 in 0.0 seconds.",
+ "Resender resending message.",
+ "Running routing policy 'Custom'.",
+ "Selecting [dst/session].",
+ "Component 'dst/session' selected by policy 'Custom'.",
+ "Resolving 'dst/session'.",
+ "Sending message (version ${VERSION}) from client to 'dst/session'",
+ "Message (type 1) received at 'dst' for session 'session'.",
+ "[APP_TRANSIENT_ERROR @ localhost]: err2",
+ "Sending reply (version ${VERSION}) from 'dst'.",
+ "Reply (type 0) received at client.",
+ "Routing policy 'Custom' merging replies.",
+ "Merged [dst/session].",
+ "Message scheduled for retry 2 in 0.0 seconds.",
+ "Resender resending message.",
+ "Running routing policy 'Custom'.",
+ "Selecting [dst/session].",
+ "Component 'dst/session' selected by policy 'Custom'.",
+ "Resolving 'dst/session'.",
+ "Sending message (version ${VERSION}) from client to 'dst/session'",
+ "Message (type 1) received at 'dst' for session 'session'.",
+ "Sending reply (version ${VERSION}) from 'dst'.",
+ "Reply (type 0) received at client.",
+ "Routing policy 'Custom' merging replies.",
+ "Merged [dst/session].",
+ "Source session received reply. 0 message(s) now pending."),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatRetryCanBeDisabled() {
+ retryPolicy.setEnabled(false);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err"));
+ dstSession.reply(reply);
+ assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60));
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.APP_TRANSIENT_ERROR, reply.getError(0).getCode());
+ }
+
+ @Test
+ public void requireThatRetryCallsSelect() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory());
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err"));
+ dstSession.reply(reply);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ dstSession.acknowledge(msg);
+ assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60));
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ assertTrace(Arrays.asList("Selecting [dst/session].",
+ "[APP_TRANSIENT_ERROR @ localhost]",
+ "-[APP_TRANSIENT_ERROR @ localhost]",
+ "Merged [dst/session].",
+ "Selecting [dst/session].",
+ "Sending reply",
+ "Merged [dst/session]."),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatPolicyCanDisableReselectOnRetry() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory(false));
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "err"));
+ dstSession.reply(reply);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ dstSession.acknowledge(msg);
+ assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60));
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ assertTrace(Arrays.asList("Selecting [dst/session].",
+ "[APP_TRANSIENT_ERROR @ localhost]",
+ "-[APP_TRANSIENT_ERROR @ localhost]",
+ "Merged [dst/session].",
+ "-Selecting [dst/session].",
+ "Sending reply",
+ "Merged [dst/session]."),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatPolicyCanConsumeErrors() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory(true, ErrorCode.NO_ADDRESS_FOR_SERVICE));
+ srcServer.mb.putProtocol(protocol);
+ retryPolicy.setEnabled(false);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/session,dst/unknown]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(0).getCode());
+ assertTrace(Arrays.asList("Selecting [dst/session, dst/unknown].",
+ "[NO_ADDRESS_FOR_SERVICE @ localhost]",
+ "Sending reply",
+ "Merged [dst/session, dst/unknown]."),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatPolicyOnlyConsumesDeclaredErrors() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory());
+ srcServer.mb.putProtocol(protocol);
+ retryPolicy.setEnabled(false);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom:dst/unknown]")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(0).getCode());
+ assertTrace(Arrays.asList("Selecting [dst/unknown].",
+ "[NO_ADDRESS_FOR_SERVICE @ localhost]",
+ "Merged [dst/unknown]."),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatPolicyCanExpandToPolicy() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory(true, ErrorCode.NO_ADDRESS_FOR_SERVICE));
+ srcServer.mb.putProtocol(protocol);
+ retryPolicy.setEnabled(false);
+ assertTrue(srcSession.send(createMessage("msg"),
+ Route.parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(0).getCode());
+ }
+
+ @Test
+ public void requireThatReplyCanBeRemovedFromChildNodes() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new RemoveReplyPolicy(true,
+ Arrays.asList(ErrorCode.NO_ADDRESS_FOR_SERVICE),
+ CustomPolicyFactory.parseRoutes(param),
+ 0);
+ }
+ });
+ srcServer.mb.putProtocol(protocol);
+ retryPolicy.setEnabled(false);
+ assertTrue(srcSession.send(createMessage("msg"),
+ Route.parse("[Custom:[Custom:dst/session],[Custom:dst/unknown]]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ assertTrace(Arrays.asList("[NO_ADDRESS_FOR_SERVICE @ localhost]",
+ "-[NO_ADDRESS_FOR_SERVICE @ localhost]",
+ "Sending message",
+ "-Sending message"),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatSetReplyWorks() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Select", new CustomPolicyFactory(true, ErrorCode.APP_FATAL_ERROR));
+ protocol.addPolicyFactory("SetReply", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new SetReplyPolicy(true, Arrays.asList(ErrorCode.APP_FATAL_ERROR), param);
+ }
+ });
+ srcServer.mb.putProtocol(protocol);
+ retryPolicy.setEnabled(false);
+ assertTrue(
+ srcSession.send(createMessage("msg"), Route.parse("[Select:[SetReply:foo],dst/session]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.APP_FATAL_ERROR, reply.getError(0).getCode());
+ assertEquals("foo", reply.getError(0).getMessage());
+ }
+
+ @Test
+ public void requireThatReplyCanBeReusedOnRetry() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("ReuseReply", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new ReuseReplyPolicy(false,
+ Arrays.asList(ErrorCode.APP_FATAL_ERROR),
+ CustomPolicyFactory.parseRoutes(param));
+ }
+ });
+ protocol.addPolicyFactory("SetReply", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new SetReplyPolicy(false,
+ Arrays.asList(ErrorCode.APP_FATAL_ERROR),
+ param);
+ }
+ });
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"),
+ Route.parse("[ReuseReply:[SetReply:foo],dst/session]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_TRANSIENT_ERROR, "dst"));
+ dstSession.reply(reply);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ dstSession.acknowledge(msg);
+ assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60));
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ }
+
+ @Test
+ public void requireThatReplyCanBeRemovedAndRetried() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("RemoveReply", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new RemoveReplyPolicy(false,
+ Arrays.asList(ErrorCode.APP_TRANSIENT_ERROR),
+ CustomPolicyFactory.parseRoutes(param),
+ 0);
+ }
+ });
+ protocol.addPolicyFactory("SetReply", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new SetReplyPolicy(false,
+ Arrays.asList(ErrorCode.APP_TRANSIENT_ERROR, ErrorCode.APP_FATAL_ERROR),
+ param);
+ }
+ });
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession
+ .send(createMessage("msg"), Route.parse("[RemoveReply:[SetReply:foo],dst/session]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.APP_FATAL_ERROR, reply.getError(0).getCode());
+ assertEquals("foo", reply.getError(0).getMessage());
+ assertTrace(Arrays.asList("Resolving '[SetReply:foo]'.",
+ "Resolving 'dst/session'.",
+ "Resender resending message.",
+ "Resolving 'dst/session'.",
+ "Resolving '[SetReply:foo]'."),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatIgnoreResultWorks() {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("?dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "dst"));
+ dstSession.reply(reply);
+ assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60));
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ assertTrace(Arrays.asList("Not waiting for a reply from 'dst/session'."),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatIgnoreResultCanBeSetInHopBlueprint() {
+ srcServer.setupRouting(new RoutingTableSpec(SimpleProtocol.NAME)
+ .addHop(new HopSpec("foo", "dst/session").setIgnoreResult(true)));
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("foo")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "dst"));
+ dstSession.reply(reply);
+ assertNotNull(reply = ((Receptor)srcSession.getReplyHandler()).getReply(60));
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ assertTrace(Arrays.asList("Not waiting for a reply from 'dst/session'."),
+ reply.getTrace());
+ }
+
+ @Test
+ public void requireThatIgnoreFlagPersistsThroughHopLookup() {
+ setupRouting(new RoutingTableSpec(SimpleProtocol.NAME).addHop(new HopSpec("foo", "dst/unknown")));
+ assertSend("?foo");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatIgnoreFlagPersistsThroughRouteLookup() {
+ setupRouting(new RoutingTableSpec(SimpleProtocol.NAME).addRoute(new RouteSpec("foo").addHop("dst/unknown")));
+ assertSend("?foo");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatIgnoreFlagPersistsThroughPolicySelect() {
+ setupPolicy("Custom", MyPolicy.newSelectAndMerge("dst/unknown"));
+ assertSend("?[Custom]");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatIgnoreFlagIsSerializedWithMessage() {
+ assertSend("dst/session foo ?bar");
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ Route route = msg.getRoute();
+ assertEquals(2, route.getNumHops());
+ Hop hop = route.getHop(0);
+ assertEquals("foo", hop.toString());
+ assertFalse(hop.getIgnoreResult());
+ hop = route.getHop(1);
+ assertEquals("?bar", hop.toString());
+ assertTrue(hop.getIgnoreResult());
+ dstSession.acknowledge(msg);
+ assertTrace("-Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatIgnoreFlagDoesNotInterfere() {
+ setupPolicy("Custom", MyPolicy.newSelectAndMerge("dst/session"));
+ assertSend("?[Custom]");
+ assertTrace("-Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatEmptySelectionCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newEmptySelection());
+ assertSend("?[Custom]");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatSelectErrorCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newSelectError(ErrorCode.APP_FATAL_ERROR, "foo"));
+ assertSend("?[Custom]");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatSelectExceptionCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newSelectException(new RuntimeException()));
+ assertSend("?[Custom]");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatSelectAndThrowCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newSelectAndThrow("dst/session", new RuntimeException()));
+ assertSend("?[Custom]");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatEmptyMergeCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newEmptyMerge("dst/session"));
+ assertSend("?[Custom]");
+ assertAcknowledge();
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatMergeErrorCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newMergeError("dst/session", ErrorCode.APP_FATAL_ERROR, "foo"));
+ assertSend("?[Custom]");
+ assertAcknowledge();
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatMergeExceptionCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newMergeException("dst/session", new RuntimeException()));
+ assertSend("?[Custom]");
+ assertAcknowledge();
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatMergeAndThrowCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newMergeAndThrow("dst/session", new RuntimeException()));
+ assertSend("?[Custom]");
+ assertAcknowledge();
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatAllocServiceAddressCanBeIgnored() {
+ assertSend("?dst/unknown");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatDepthLimitCanBeIgnored() {
+ setupPolicy("Custom", MyPolicy.newSelectAndMerge("[Custom]"));
+ assertSend("?[Custom]");
+ assertTrace("Ignoring errors in reply.");
+ }
+
+ @Test
+ public void requireThatRouteCanBeEmptyInDestination() {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/session")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ assertNull(msg.getRoute());
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ }
+
+ @Test
+ public void requireThatOnlyActiveNodesAreAborted() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new CustomPolicyFactory(false));
+ protocol.addPolicyFactory("SetReply", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new SetReplyPolicy(false,
+ Arrays.asList(ErrorCode.APP_TRANSIENT_ERROR,
+ ErrorCode.APP_TRANSIENT_ERROR,
+ ErrorCode.APP_FATAL_ERROR),
+ param);
+ }
+ });
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"),
+ Route.parse("[Custom:[SetReply:foo],?bar,dst/session]")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(2, reply.getNumErrors());
+ assertEquals(ErrorCode.APP_FATAL_ERROR, reply.getError(0).getCode());
+ assertEquals(ErrorCode.SEND_ABORTED, reply.getError(1).getCode());
+ }
+
+ @Test
+ public void requireThatTimeoutWorks() {
+ retryPolicy.setBaseDelay(0.01);
+ srcSession.setTimeout(0.5);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("dst/unknown")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(2, reply.getNumErrors());
+ assertEquals(ErrorCode.NO_ADDRESS_FOR_SERVICE, reply.getError(0).getCode());
+ assertEquals(ErrorCode.TIMEOUT, reply.getError(1).getCode());
+ }
+
+ @Test
+ public void requireThatUnknownPolicyIsCaught() {
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Unknown]")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.UNKNOWN_POLICY, reply.getError(0).getCode());
+ }
+
+ private SimpleProtocol.PolicyFactory exceptionOnSelectThrowingMockFactory() {
+ return new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new RoutingPolicy() {
+
+ @Override
+ public void select(RoutingContext context) {
+ throw new RuntimeException("69");
+ }
+
+ @Override
+ public void merge(RoutingContext context) {
+ }
+
+ @Override
+ public void destroy() {
+ }
+ };
+ }
+ };
+ }
+
+ @Test
+ public void requireThatSelectExceptionIsCaught() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", exceptionOnSelectThrowingMockFactory());
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom]")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.POLICY_ERROR, reply.getError(0).getCode());
+ assertTrue(reply.getError(0).getMessage().contains("69"));
+ }
+
+ @Test
+ public void selectExceptionIncludesStackTraceInMessage() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", exceptionOnSelectThrowingMockFactory());
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom]")).isAccepted());
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertEquals(ErrorCode.POLICY_ERROR, reply.getError(0).getCode());
+ // Attempting any sort of full matching of the stack trace is brittle, so
+ // simplify by assuming any message which mentions the source file of the
+ // originating exception is good to go.
+ assertTrue(reply.getError(0).getMessage().contains("RoutingTestCase"));
+ }
+
+ @Test
+ public void requireThatMergeExceptionIsCaught() {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory("Custom", new SimpleProtocol.PolicyFactory() {
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new RoutingPolicy() {
+
+ @Override
+ public void select(RoutingContext context) {
+ context.addChild(Route.parse("dst/session"));
+ }
+
+ @Override
+ public void merge(RoutingContext context) {
+ throw new RuntimeException("69");
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ };
+ }
+ });
+ srcServer.mb.putProtocol(protocol);
+ assertTrue(srcSession.send(createMessage("msg"), Route.parse("[Custom]")).isAccepted());
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertEquals(1, reply.getNumErrors());
+ assertEquals(ErrorCode.POLICY_ERROR, reply.getError(0).getCode());
+ assertTrue(reply.getError(0).getMessage().contains("69"));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private static Message createMessage(String msg) {
+ SimpleMessage ret = new SimpleMessage(msg);
+ ret.getTrace().setLevel(9);
+ return ret;
+ }
+
+ private void setupRouting(RoutingTableSpec spec) {
+ srcServer.setupRouting(spec);
+ }
+
+ private void setupPolicy(String policyName, SimpleProtocol.PolicyFactory policyFactory) {
+ SimpleProtocol protocol = new SimpleProtocol();
+ protocol.addPolicyFactory(policyName, policyFactory);
+ srcServer.mb.putProtocol(protocol);
+ }
+
+ private void assertSend(String route) {
+ assertTrue(srcSession.send(createMessage("msg").setRoute(Route.parse(route))).isAccepted());
+ }
+
+ private void assertAcknowledge() {
+ Message msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ dstSession.acknowledge(msg);
+ }
+
+ private void assertTrace(String... expectedTrace) {
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ System.out.println(reply.getTrace());
+ assertFalse(reply.hasErrors());
+ assertTrace(Arrays.asList(expectedTrace), reply.getTrace());
+ }
+
+ public static void assertTrace(List<String> expected, Trace trace) {
+ String actual = trace.toString();
+ for (int i = 0, pos = -1; i < expected.size(); ++i) {
+ String line = expected.get(i).replaceFirst("\\$\\{VERSION\\}", Vtag.currentVersion.toString());
+ if (line.charAt(0) == '-') {
+ String str = line.substring(1);
+ assertTrue("Line " + i + " '" + str + "' not expected.",
+ actual.indexOf(str, pos + 1) < 0);
+ } else {
+ pos = actual.indexOf(line, pos + 1);
+ assertTrue("Line " + i + " '" + line + "' missing.", pos >= 0);
+ }
+ }
+ }
+
+ private static class RemoveReplyPolicy extends CustomPolicy {
+
+ final int idxRemove;
+
+ public RemoveReplyPolicy(boolean selectOnRetry, List<Integer> consumableErrors, List<Route> routes,
+ int idxRemove) {
+ super(selectOnRetry, consumableErrors, routes);
+ this.idxRemove = idxRemove;
+ }
+
+ public void merge(RoutingContext ctx) {
+ ctx.setReply(ctx.getChildIterator().skip(idxRemove).removeReply());
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ private static class ReuseReplyPolicy extends CustomPolicy {
+
+ final List<Integer> errorMask = new ArrayList<>();
+
+ public ReuseReplyPolicy(boolean selectOnRetry, List<Integer> errorMask,
+ List<Route> routes) {
+ super(selectOnRetry, errorMask, routes);
+ this.errorMask.addAll(errorMask);
+ }
+
+ public void merge(RoutingContext ctx) {
+ Reply ret = new EmptyReply();
+ int idx = 0;
+ int idxFirstOk = -1;
+ for (RoutingNodeIterator it = ctx.getChildIterator();
+ it.isValid(); it.next(), ++idx) {
+ Reply ref = it.getReplyRef();
+ if (!ref.hasErrors()) {
+ if (idxFirstOk < 0) {
+ idxFirstOk = idx;
+ }
+ } else {
+ for (int i = 0; i < ref.getNumErrors(); ++i) {
+ Error err = ref.getError(i);
+ if (!errorMask.contains(err.getCode())) {
+ ret.addError(err);
+ }
+ }
+ }
+ }
+ if (ret.hasErrors()) {
+ ctx.setReply(ret);
+ } else {
+ ctx.setReply(ctx.getChildIterator().skip(idxFirstOk).removeReply());
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ private static class SetReplyPolicy implements RoutingPolicy {
+
+ final boolean selectOnRetry;
+ final List<Integer> errors = new ArrayList<>();
+ final String param;
+ int idx = 0;
+
+ public SetReplyPolicy(boolean selectOnRetry, List<Integer> errors, String param) {
+ this.selectOnRetry = selectOnRetry;
+ this.errors.addAll(errors);
+ this.param = param;
+ }
+
+ public void select(RoutingContext ctx) {
+ int idx = this.idx++;
+ int err = errors.get(idx < errors.size() ? idx : errors.size() - 1);
+ if (err != ErrorCode.NONE) {
+ ctx.setError(err, param);
+ } else {
+ ctx.setReply(new EmptyReply());
+ }
+ ctx.setSelectOnRetry(selectOnRetry);
+ }
+
+ public void merge(RoutingContext ctx) {
+ Reply reply = new EmptyReply();
+ reply.addError(new Error(ErrorCode.FATAL_ERROR,
+ "Merge should not be called when select() sets a reply."));
+ ctx.setReply(reply);
+ }
+
+ public void destroy() {
+ }
+ }
+
+ private static class MyPolicy implements SimpleProtocol.PolicyFactory {
+
+ final Route selectRoute;
+ final Reply selectReply;
+ final Reply mergeReply;
+ final RuntimeException selectException;
+ final RuntimeException mergeException;
+ final boolean mergeFromChild;
+
+ MyPolicy(Route selectRoute, Reply selectReply, RuntimeException selectException,
+ Reply mergeReply, RuntimeException mergeException, boolean mergeFromChild) {
+ this.selectRoute = selectRoute;
+ this.selectReply = selectReply;
+ this.selectException = selectException;
+ this.mergeReply = mergeReply;
+ this.mergeException = mergeException;
+ this.mergeFromChild = mergeFromChild;
+ }
+
+ @Override
+ public RoutingPolicy create(String param) {
+ return new RoutingPolicy() {
+
+ @Override
+ public void select(RoutingContext context) {
+ if (selectRoute != null) {
+ context.addChild(selectRoute);
+ }
+ if (selectReply != null) {
+ context.setReply(selectReply);
+ }
+ if (selectException != null) {
+ throw selectException;
+ }
+ }
+
+ @Override
+ public void merge(RoutingContext context) {
+ if (mergeReply != null) {
+ context.setReply(mergeReply);
+ } else if (mergeFromChild) {
+ context.setReply(context.getChildIterator().removeReply());
+ }
+ if (mergeException != null) {
+ throw mergeException;
+ }
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ };
+ }
+
+ static Reply newErrorReply(int errCode, String errMessage) {
+ Reply reply = new EmptyReply();
+ reply.addError(new Error(errCode, errMessage));
+ return reply;
+ }
+
+ static MyPolicy newSelectAndMerge(String select) {
+ return new MyPolicy(Route.parse(select), null, null, null, null, true);
+ }
+
+ static MyPolicy newEmptySelection() {
+ return new MyPolicy(null, null, null, null, null, false);
+ }
+
+ static MyPolicy newSelectError(int errCode, String errMessage) {
+ return new MyPolicy(null, newErrorReply(errCode, errMessage), null, null, null, false);
+ }
+
+ static MyPolicy newSelectException(RuntimeException e) {
+ return new MyPolicy(null, null, e, null, null, false);
+ }
+
+ static MyPolicy newSelectAndThrow(String select, RuntimeException e) {
+ return new MyPolicy(Route.parse(select), null, e, null, null, false);
+ }
+
+ static MyPolicy newEmptyMerge(String select) {
+ return new MyPolicy(Route.parse(select), null, null, null, null, false);
+ }
+
+ static MyPolicy newMergeError(String select, int errCode, String errMessage) {
+ return new MyPolicy(Route.parse(select), null, null, newErrorReply(errCode, errMessage), null, false);
+ }
+
+ static MyPolicy newMergeException(String select, RuntimeException e) {
+ return new MyPolicy(Route.parse(select), null, null, null, e, false);
+ }
+
+ static MyPolicy newMergeAndThrow(String select, RuntimeException e) {
+ return new MyPolicy(Route.parse(select), null, null, null, e, true);
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/QueueAdapterTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/QueueAdapterTestCase.java
new file mode 100644
index 00000000000..000505f6349
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/test/QueueAdapterTestCase.java
@@ -0,0 +1,108 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.test;
+
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.text.Utf8String;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class QueueAdapterTestCase {
+
+ private static final int NO_WAIT = 0;
+ private static final int WAIT_FOREVER = 60;
+
+ @Test
+ public void requireThatAccessorsWork() {
+ QueueAdapter queue = new QueueAdapter();
+ assertTrue(queue.isEmpty());
+ assertEquals(0, queue.size());
+
+ Message msg = new MyMessage();
+ queue.handleMessage(msg);
+ assertFalse(queue.isEmpty());
+ assertEquals(1, queue.size());
+
+ MyReply reply = new MyReply();
+ queue.handleReply(reply);
+ assertFalse(queue.isEmpty());
+ assertEquals(2, queue.size());
+
+ assertSame(msg, queue.dequeue());
+ assertSame(reply, queue.dequeue());
+ }
+
+ @Test
+ public void requireThatSizeCanBeWaitedFor() {
+ final QueueAdapter queue = new QueueAdapter();
+ assertTrue(queue.waitSize(0, NO_WAIT));
+ assertFalse(queue.waitSize(1, NO_WAIT));
+ queue.handleMessage(new MyMessage());
+ assertFalse(queue.waitSize(0, NO_WAIT));
+ assertTrue(queue.waitSize(1, NO_WAIT));
+
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ queue.handleMessage(new MyMessage());
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ thread.start();
+ assertTrue(queue.waitSize(2, WAIT_FOREVER));
+ }
+
+ @Test
+ public void requireThatWaitCanBeInterrupted() throws InterruptedException {
+ final QueueAdapter queue = new QueueAdapter();
+ final AtomicReference<Boolean> result = new AtomicReference<>();
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ result.set(queue.waitSize(1, WAIT_FOREVER));
+ }
+ };
+ thread.start();
+ thread.interrupt();
+ thread.join();
+ assertEquals(Boolean.FALSE, result.get());
+ }
+
+ private static class MyMessage extends Message {
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+
+ private static class MyReply extends Reply {
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/ReceptorTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/ReceptorTestCase.java
new file mode 100644
index 00000000000..a34f37b0196
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/test/ReceptorTestCase.java
@@ -0,0 +1,143 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.test;
+
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.text.Utf8String;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class ReceptorTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ Receptor receptor = new Receptor();
+ assertNull(receptor.getMessage(0));
+ Message msg = new MyMessage();
+ receptor.handleMessage(msg);
+ assertSame(msg, receptor.getMessage(0));
+
+ Reply reply = new MyReply();
+ receptor.handleReply(reply);
+ assertSame(reply, receptor.getReply(0));
+ }
+
+ @Test
+ public void requireThatMessagesAndRepliesAreTrackedIndividually() {
+ Receptor receptor = new Receptor();
+ receptor.handleMessage(new MyMessage());
+ receptor.handleReply(new MyReply());
+ assertNotNull(receptor.getMessage(0));
+ assertNotNull(receptor.getReply(0));
+
+ receptor.handleMessage(new MyMessage());
+ receptor.handleReply(new MyReply());
+ assertNotNull(receptor.getReply(0));
+ assertNotNull(receptor.getMessage(0));
+ }
+
+ @Test
+ public void requireThatMessagesCanBeWaitedFor() {
+ final Receptor receptor = new Receptor();
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ receptor.handleMessage(new MyMessage());
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ thread.start();
+ assertNotNull(receptor.getMessage(60));
+ }
+
+ @Test
+ public void requireThatMessageWaitCanBeInterrupted() throws InterruptedException {
+ final Receptor receptor = new Receptor();
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ receptor.getMessage(60);
+ latch.countDown();
+ }
+ };
+ thread.start();
+ thread.interrupt();
+ assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void requireThatRepliesCanBeWaitedFor() {
+ final Receptor receptor = new Receptor();
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ receptor.handleReply(new MyReply());
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ thread.start();
+ assertNotNull(receptor.getReply(60));
+ }
+
+ @Test
+ public void requireThatReplyWaitCanBeInterrupted() throws InterruptedException {
+ final Receptor receptor = new Receptor();
+ final CountDownLatch latch = new CountDownLatch(1);
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ receptor.getReply(60);
+ latch.countDown();
+ }
+ };
+ thread.start();
+ thread.interrupt();
+ assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
+
+ private static class MyMessage extends Message {
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+
+ private static class MyReply extends Reply {
+
+ @Override
+ public Utf8String getProtocol() {
+ return null;
+ }
+
+ @Override
+ public int getType() {
+ return 0;
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleMessageTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleMessageTestCase.java
new file mode 100644
index 00000000000..ead0fdb88b4
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleMessageTestCase.java
@@ -0,0 +1,25 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.test;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class SimpleMessageTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ SimpleMessage msg = new SimpleMessage("foo");
+ assertEquals(SimpleProtocol.MESSAGE, msg.getType());
+ assertEquals(SimpleProtocol.NAME, msg.getProtocol());
+ assertEquals(3, msg.getApproxSize());
+ assertEquals("foo", msg.getValue());
+ msg.setValue("bar");
+ assertEquals("bar", msg.getValue());
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleProtocolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleProtocolTestCase.java
new file mode 100644
index 00000000000..ce42762f235
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleProtocolTestCase.java
@@ -0,0 +1,65 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.test;
+
+import com.yahoo.component.Version;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Routable;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class SimpleProtocolTestCase {
+
+ private static final Version VERSION = new Version(1);
+ private static final SimpleProtocol PROTOCOL = new SimpleProtocol();
+
+ @Test
+ public void requireThatNameIsSet() {
+ assertEquals(SimpleProtocol.NAME, PROTOCOL.getName());
+ }
+
+ @Test
+ public void requireThatMetricSetIsNull() {
+ assertNull(PROTOCOL.getMetrics());
+ }
+
+ @Test
+ public void requireThatMessageCanBeEncodedAndDecoded() {
+ SimpleMessage msg = new SimpleMessage("foo");
+ byte[] buf = PROTOCOL.encode(Version.emptyVersion, msg);
+ Routable routable = PROTOCOL.decode(Version.emptyVersion, buf);
+ assertNotNull(routable);
+ assertEquals(SimpleMessage.class, routable.getClass());
+ msg = (SimpleMessage)routable;
+ assertEquals("foo", msg.getValue());
+ }
+
+ @Test
+ public void requireThatReplyCanBeDecoded() {
+ SimpleReply reply = new SimpleReply("foo");
+ byte[] buf = PROTOCOL.encode(Version.emptyVersion, reply);
+ Routable routable = PROTOCOL.decode(Version.emptyVersion, buf);
+ assertNotNull(routable);
+ assertEquals(SimpleReply.class, routable.getClass());
+ reply = (SimpleReply)routable;
+ assertEquals("foo", reply.getValue());
+ }
+
+ @Test
+ public void requireThatUnknownRoutablesAreNotEncoded() {
+ assertNull(PROTOCOL.encode(VERSION, new EmptyReply()));
+ }
+
+ @Test
+ public void requireThatEmptyBufferIsNotDecoded() {
+ assertNull(PROTOCOL.decode(VERSION, new byte[0]));
+ }
+
+ @Test
+ public void requireThatUnknownBufferIsNotDecoded() {
+ assertNull(PROTOCOL.decode(VERSION, new byte[] { 'U' }));
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleReplyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleReplyTestCase.java
new file mode 100644
index 00000000000..474fea14ac7
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/test/SimpleReplyTestCase.java
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.test;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class SimpleReplyTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ SimpleReply reply = new SimpleReply("foo");
+ assertEquals(SimpleProtocol.REPLY, reply.getType());
+ assertEquals(SimpleProtocol.NAME, reply.getProtocol());
+ assertEquals("foo", reply.getValue());
+ reply.setValue("bar");
+ assertEquals("bar", reply.getValue());
+ }
+}