summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorArne H Juul <arnej@yahoo-inc.com>2017-01-03 14:41:04 +0100
committerArne H Juul <arnej@yahoo-inc.com>2017-01-03 14:41:04 +0100
commitc73ae73b3c7bc8ebe7eb17fe12afdef1976563c7 (patch)
tree480337b08ec6c62bbcf2aea8132b1d3cdc4fd7e8 /messagebus
parent8b423e4c115d647307349e8be621fe189f27cb2c (diff)
whitespace fixups.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java510
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java12
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java216
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java6
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java184
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java180
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java70
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java56
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java2
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java260
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java50
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java162
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java58
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java94
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java124
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java212
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java198
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java128
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java102
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java182
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java336
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java104
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java312
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java178
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java110
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java220
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java62
-rwxr-xr-xmessagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java670
29 files changed, 2400 insertions, 2400 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
index 9d6af3a84a9..57ff0028585 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
@@ -1,256 +1,256 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.concurrent.Timer;
-import com.yahoo.log.LogLevel;
-import java.util.logging.Logger;
-
-/**
- * This is an implementatin of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a
- * {@link SourceSession} is allowed to have.
- *
- * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class DynamicThrottlePolicy extends StaticThrottlePolicy {
-
- private static final long IDLE_TIME_MILLIS = 60000;
- private final Timer timer;
- private int numSent = 0;
- private int numOk = 0;
- private double resizeRate = 3;
- private long resizeTime = 0;
- private long timeOfLastMessage;
- private double efficiencyThreshold = 1.0;
- private double windowSizeIncrement = 20;
- private double windowSize = windowSizeIncrement;
- private double minWindowSize = windowSizeIncrement;
- private double maxWindowSize = Integer.MAX_VALUE;
- private double windowSizeBackOff = 0.9;
- private double weight = 1.0;
- private double localMaxThroughput = 0;
- private double maxThroughput = 0;
- private static final Logger log = Logger.getLogger(DynamicThrottlePolicy.class.getName());
-
- /**
- * Constructs a new instance of this policy and sets the appropriate default values of member data.
- */
- public DynamicThrottlePolicy() {
- this(SystemTimer.INSTANCE);
- }
-
- /**
- * Constructs a new instance of this class using the given clock to calculate efficiency.
- *
- * @param timer The timer to use.
- */
- public DynamicThrottlePolicy(Timer timer) {
- this.timer = timer;
- this.timeOfLastMessage = timer.milliTime();
- }
-
- public double getWindowSizeIncrement() {
- return windowSizeIncrement;
- }
-
- public double getWindowSizeBackOff() {
- return windowSizeBackOff;
- }
-
- public void setMaxThroughput(double maxThroughput) {
- this.maxThroughput = maxThroughput;
- }
-
- @Override
- public boolean canSend(Message msg, int pendingCount) {
- if (!super.canSend(msg, pendingCount)) {
- return false;
- }
- long time = timer.milliTime();
- double elapsed = (time - timeOfLastMessage);
- if (elapsed > IDLE_TIME_MILLIS) {
- windowSize = Math.min(windowSize, pendingCount + windowSizeIncrement);
- }
- timeOfLastMessage = time;
- return pendingCount < windowSize;
- }
-
- @Override
- public void processMessage(Message msg) {
- super.processMessage(msg);
- if (++numSent < windowSize * resizeRate) {
- return;
- }
-
- long time = timer.milliTime();
- double elapsed = time - resizeTime;
- resizeTime = time;
-
- double throughput = numOk / elapsed;
- numSent = 0;
- numOk = 0;
-
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput);
- }
-
- if (maxThroughput > 0 && throughput > maxThroughput * 0.95) {
- // No need to increase window when we're this close to max.
- } else if (throughput > localMaxThroughput * 1.01) {
- localMaxThroughput = throughput;
- windowSize += weight*windowSizeIncrement;
- } else {
- // scale up/down throughput for comparing to window size
- double period = 1;
- while(throughput * period/windowSize < 2) {
- period *= 10;
- }
- while(throughput * period/windowSize > 2) {
- period *= 0.1;
- }
- double efficiency = throughput*period/windowSize;
- if (efficiency < efficiencyThreshold) {
- double newSize = Math.min(windowSize,throughput * period);
- windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - 2* windowSizeIncrement);
- localMaxThroughput = 0;
- } else {
- windowSize += weight*windowSizeIncrement;
- }
- }
- windowSize = Math.max(minWindowSize, windowSize);
- windowSize = Math.min(maxWindowSize, windowSize);
- }
-
- @Override
- public void processReply(Reply reply) {
- super.processReply(reply);
- if (!reply.hasErrors()) {
- ++numOk;
- }
- }
-
- /**
- * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is
- * the correlation between throughput and window size. The algorithm will increase the window size until efficiency
- * drops below the efficiency of the local maxima times this value.
- *
- * @param efficiencyThreshold The limit to set.
- * @return This, to allow chaining.
- * @see #setWindowSizeBackOff(double)
- */
- public DynamicThrottlePolicy setEfficiencyThreshold(double efficiencyThreshold) {
- this.efficiencyThreshold = efficiencyThreshold;
- return this;
- }
-
- /**
- * Sets the step size used when increasing window size.
- *
- * @param windowSizeIncrement The step size to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setWindowSizeIncrement(double windowSizeIncrement) {
- this.windowSizeIncrement = windowSizeIncrement;
- return this;
- }
-
- /**
- * Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing.
- * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to
- * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range.
- *
- * @param windowSizeBackOff The back off to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setWindowSizeBackOff(double windowSizeBackOff) {
- this.windowSizeBackOff = Math.max(0, Math.min(1, windowSizeBackOff));
- return this;
- }
-
- /**
- * Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing
- * becomes. However, the smaller the value, the less accurate the measurements become.
- *
- * @param resizeRate The rate to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setResizeRate(double resizeRate) {
- this.resizeRate = resizeRate;
- return this;
- }
-
- /**
- * Sets the weight for this client. The larger the value, the more resources
- * will be allocated to this clients. Resources are shared between clients
- * proportiannally to their weights.
- *
- * @param weight The weight to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setWeight(double weight) {
- this.weight = weight;
- return this;
- }
-
- /**
- * Sets the maximium number of pending operations allowed at any time, in
- * order to avoid using too much resources.
- *
- * @param max The max to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setMaxWindowSize(double max) {
- this.maxWindowSize = max;
- return this;
- }
-
- /**
- * Get the maximum number of pending operations allowed at any time.
- *
- * @return The maximum number of operations.
- */
- public double getMaxWindowSize() {
- return maxWindowSize;
- }
-
-
- /**
- * Sets the minimium number of pending operations allowed at any time, in
- * order to keep a level of performance.
- *
- * @param min The min to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setMinWindowSize(double min) {
- this.minWindowSize = min;
- return this;
- }
-
- /**
- * Get the minimum number of pending operations allowed at any time.
- *
- * @return The minimum number of operations.
- */
- public double getMinWindowSize() {
- return minWindowSize;
- }
-
- public DynamicThrottlePolicy setMaxPendingCount(int maxCount) {
- super.setMaxPendingCount(maxCount);
- maxWindowSize = maxCount;
- return this;
- }
-
-
- /**
- * Returns the maximum number of pending messages allowed.
- *
- * @return The max limit.
- */
- public int getMaxPendingCount() {
- return (int)windowSize;
- }
-
-}
+package com.yahoo.messagebus;
+
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.concurrent.Timer;
+import com.yahoo.log.LogLevel;
+import java.util.logging.Logger;
+
+/**
+ * This is an implementatin of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a
+ * {@link SourceSession} is allowed to have.
+ *
+ * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class DynamicThrottlePolicy extends StaticThrottlePolicy {
+
+ private static final long IDLE_TIME_MILLIS = 60000;
+ private final Timer timer;
+ private int numSent = 0;
+ private int numOk = 0;
+ private double resizeRate = 3;
+ private long resizeTime = 0;
+ private long timeOfLastMessage;
+ private double efficiencyThreshold = 1.0;
+ private double windowSizeIncrement = 20;
+ private double windowSize = windowSizeIncrement;
+ private double minWindowSize = windowSizeIncrement;
+ private double maxWindowSize = Integer.MAX_VALUE;
+ private double windowSizeBackOff = 0.9;
+ private double weight = 1.0;
+ private double localMaxThroughput = 0;
+ private double maxThroughput = 0;
+ private static final Logger log = Logger.getLogger(DynamicThrottlePolicy.class.getName());
+
+ /**
+ * Constructs a new instance of this policy and sets the appropriate default values of member data.
+ */
+ public DynamicThrottlePolicy() {
+ this(SystemTimer.INSTANCE);
+ }
+
+ /**
+ * Constructs a new instance of this class using the given clock to calculate efficiency.
+ *
+ * @param timer The timer to use.
+ */
+ public DynamicThrottlePolicy(Timer timer) {
+ this.timer = timer;
+ this.timeOfLastMessage = timer.milliTime();
+ }
+
+ public double getWindowSizeIncrement() {
+ return windowSizeIncrement;
+ }
+
+ public double getWindowSizeBackOff() {
+ return windowSizeBackOff;
+ }
+
+ public void setMaxThroughput(double maxThroughput) {
+ this.maxThroughput = maxThroughput;
+ }
+
+ @Override
+ public boolean canSend(Message msg, int pendingCount) {
+ if (!super.canSend(msg, pendingCount)) {
+ return false;
+ }
+ long time = timer.milliTime();
+ double elapsed = (time - timeOfLastMessage);
+ if (elapsed > IDLE_TIME_MILLIS) {
+ windowSize = Math.min(windowSize, pendingCount + windowSizeIncrement);
+ }
+ timeOfLastMessage = time;
+ return pendingCount < windowSize;
+ }
+
+ @Override
+ public void processMessage(Message msg) {
+ super.processMessage(msg);
+ if (++numSent < windowSize * resizeRate) {
+ return;
+ }
+
+ long time = timer.milliTime();
+ double elapsed = time - resizeTime;
+ resizeTime = time;
+
+ double throughput = numOk / elapsed;
+ numSent = 0;
+ numOk = 0;
+
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput);
+ }
+
+ if (maxThroughput > 0 && throughput > maxThroughput * 0.95) {
+ // No need to increase window when we're this close to max.
+ } else if (throughput > localMaxThroughput * 1.01) {
+ localMaxThroughput = throughput;
+ windowSize += weight*windowSizeIncrement;
+ } else {
+ // scale up/down throughput for comparing to window size
+ double period = 1;
+ while(throughput * period/windowSize < 2) {
+ period *= 10;
+ }
+ while(throughput * period/windowSize > 2) {
+ period *= 0.1;
+ }
+ double efficiency = throughput*period/windowSize;
+ if (efficiency < efficiencyThreshold) {
+ double newSize = Math.min(windowSize,throughput * period);
+ windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - 2* windowSizeIncrement);
+ localMaxThroughput = 0;
+ } else {
+ windowSize += weight*windowSizeIncrement;
+ }
+ }
+ windowSize = Math.max(minWindowSize, windowSize);
+ windowSize = Math.min(maxWindowSize, windowSize);
+ }
+
+ @Override
+ public void processReply(Reply reply) {
+ super.processReply(reply);
+ if (!reply.hasErrors()) {
+ ++numOk;
+ }
+ }
+
+ /**
+ * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is
+ * the correlation between throughput and window size. The algorithm will increase the window size until efficiency
+ * drops below the efficiency of the local maxima times this value.
+ *
+ * @param efficiencyThreshold The limit to set.
+ * @return This, to allow chaining.
+ * @see #setWindowSizeBackOff(double)
+ */
+ public DynamicThrottlePolicy setEfficiencyThreshold(double efficiencyThreshold) {
+ this.efficiencyThreshold = efficiencyThreshold;
+ return this;
+ }
+
+ /**
+ * Sets the step size used when increasing window size.
+ *
+ * @param windowSizeIncrement The step size to set.
+ * @return This, to allow chaining.
+ */
+ public DynamicThrottlePolicy setWindowSizeIncrement(double windowSizeIncrement) {
+ this.windowSizeIncrement = windowSizeIncrement;
+ return this;
+ }
+
+ /**
+ * Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing.
+ * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to
+ * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range.
+ *
+ * @param windowSizeBackOff The back off to set.
+ * @return This, to allow chaining.
+ */
+ public DynamicThrottlePolicy setWindowSizeBackOff(double windowSizeBackOff) {
+ this.windowSizeBackOff = Math.max(0, Math.min(1, windowSizeBackOff));
+ return this;
+ }
+
+ /**
+ * Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing
+ * becomes. However, the smaller the value, the less accurate the measurements become.
+ *
+ * @param resizeRate The rate to set.
+ * @return This, to allow chaining.
+ */
+ public DynamicThrottlePolicy setResizeRate(double resizeRate) {
+ this.resizeRate = resizeRate;
+ return this;
+ }
+
+ /**
+ * Sets the weight for this client. The larger the value, the more resources
+ * will be allocated to this clients. Resources are shared between clients
+ * proportiannally to their weights.
+ *
+ * @param weight The weight to set.
+ * @return This, to allow chaining.
+ */
+ public DynamicThrottlePolicy setWeight(double weight) {
+ this.weight = weight;
+ return this;
+ }
+
+ /**
+ * Sets the maximium number of pending operations allowed at any time, in
+ * order to avoid using too much resources.
+ *
+ * @param max The max to set.
+ * @return This, to allow chaining.
+ */
+ public DynamicThrottlePolicy setMaxWindowSize(double max) {
+ this.maxWindowSize = max;
+ return this;
+ }
+
+ /**
+ * Get the maximum number of pending operations allowed at any time.
+ *
+ * @return The maximum number of operations.
+ */
+ public double getMaxWindowSize() {
+ return maxWindowSize;
+ }
+
+
+ /**
+ * Sets the minimium number of pending operations allowed at any time, in
+ * order to keep a level of performance.
+ *
+ * @param min The min to set.
+ * @return This, to allow chaining.
+ */
+ public DynamicThrottlePolicy setMinWindowSize(double min) {
+ this.minWindowSize = min;
+ return this;
+ }
+
+ /**
+ * Get the minimum number of pending operations allowed at any time.
+ *
+ * @return The minimum number of operations.
+ */
+ public double getMinWindowSize() {
+ return minWindowSize;
+ }
+
+ public DynamicThrottlePolicy setMaxPendingCount(int maxCount) {
+ super.setMaxPendingCount(maxCount);
+ maxWindowSize = maxCount;
+ return this;
+ }
+
+
+ /**
+ * Returns the maximum number of pending messages allowed.
+ *
+ * @return The max limit.
+ */
+ public int getMaxPendingCount() {
+ return (int)windowSize;
+ }
+
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
index cf5beb4a903..91d3ba966c3 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java
@@ -35,7 +35,7 @@ import java.util.logging.Logger;
* messaging semantics of each hop.</p>
*
* The responsibilities of a message bus are:
- * <ul>
+ * <ul>
* <li>Assign a route to every send message from its routing table
* <li>Deliver every message it <i>accepts</i> to the next hop on its route
* <i>or</i> deliver a <i>failure reply</i>.
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
index 24e177f1fbf..310bbf2c3a6 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java
@@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* The combination of a messagebus and a network over which it may send data.
- *
+ *
* @author bratseth
*/
public class NetworkMessageBus {
@@ -16,12 +16,12 @@ public class NetworkMessageBus {
private final MessageBus messageBus;
private final AtomicBoolean destroyed = new AtomicBoolean(false);
-
+
public NetworkMessageBus(Network network, MessageBus messageBus) {
this.network = network;
this.messageBus = messageBus;
}
-
+
/** Returns the contained message bus object */
public MessageBus getMessageBus() { return messageBus; }
@@ -30,14 +30,14 @@ public class NetworkMessageBus {
/**
* Irreversibly destroys the content of this.
- *
+ *
* @return whether this destroyed anything, or if it was already destroyed
*/
public boolean destroy() {
if ( destroyed.getAndSet(true)) return false;
-
+
getMessageBus().destroy();
return true;
}
-
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java b/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java
index 949a8486fb7..503119d9350 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java
@@ -1,109 +1,109 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-import com.yahoo.concurrent.CopyOnWriteHashMap;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.routing.RoutingPolicy;
-import com.yahoo.text.Utf8String;
-
-import java.util.logging.Logger;
-
-/**
- * Implements a thread-safe repository for protocols and their routing policies. This manages an internal cache of
- * routing policies so that similarly referenced policy directives share the same instance of a policy.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ProtocolRepository {
-
- private static final Logger log = Logger.getLogger(ProtocolRepository.class.getName());
- private final CopyOnWriteHashMap<String, Protocol> protocols = new CopyOnWriteHashMap<>();
- private final CopyOnWriteHashMap<String, RoutingPolicy> routingPolicyCache = new CopyOnWriteHashMap<>();
-
- /**
- * Registers a protocol with this repository. This will overwrite any protocol that was registered earlier that has
- * the same name. If this method detects a protocol replacement, it will clear its internal routing policy cache.
- *
- * @param protocol The protocol to register.
- */
- public void putProtocol(Protocol protocol) {
- if (protocols.put(protocol.getName(), protocol) != null) {
- routingPolicyCache.clear();
- }
- }
-
- /**
- * Returns whether or not this repository contains a protocol with the given name. Given the concurrent nature of
- * things, one should not invoke this method followed by {@link #getProtocol(String)} and expect the return value to
- * be non-null. Instead just get the protocol and compare it to null.
- *
- * @param name The name to check for.
- * @return True if the named protocol is registered.
- */
- public boolean hasProtocol(String name) {
- return protocols.containsKey(name);
- }
-
- /**
- * Returns the protocol whose name matches the given argument. This method will return null if no such protocol has
- * been registered.
- *
- * @param name The name of the protocol to return.
- * @return The protocol registered, or null.
- */
- public Protocol getProtocol(String name) {
- return protocols.get(name);
- }
-
- /**
- * Creates and returns a routing policy that matches the given arguments. If a routing policy has been created
- * previously using the exact same parameters, this method will returned that cached instance instead of creating
- * another. Not that when you replace a protocol using {@link #putProtocol(Protocol)} the policy cache is cleared.
- *
- * @param protocolName The name of the protocol whose routing policy to create.
- * @param policyName The name of the routing policy to create.
- * @param policyParam The parameter to pass to the routing policy constructor.
- * @return The created routing policy.
- */
- public RoutingPolicy getRoutingPolicy(String protocolName, String policyName, String policyParam) {
- String cacheKey = protocolName + "." + policyName + "." + policyParam;
- RoutingPolicy ret = routingPolicyCache.get(cacheKey);
- if (ret != null) {
- return ret;
- }
- synchronized (this) {
- Protocol protocol = getProtocol(protocolName);
- if (protocol == null) {
- log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' not supported.");
- return null;
- }
- try {
- ret = protocol.createPolicy(policyName, policyParam);
- } catch (RuntimeException e) {
- log.log(LogLevel.ERROR, "Protcol '" + protocolName + "' threw an exception: " + e.getMessage(), e);
- return null;
- }
- if (ret == null) {
- log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' failed to create routing policy '" + policyName +
- "' with parameter '" + policyParam + "'.");
- return null;
- }
- routingPolicyCache.put(cacheKey, ret);
- }
- return ret;
- }
-
- public final RoutingPolicy getRoutingPolicy(Utf8String protocolName, String policyName, String policyParam) {
- return getRoutingPolicy(protocolName.toString(), policyName, policyParam);
- }
-
- /**
- * Clears the internal cache of routing policies.
- */
- public synchronized void clearPolicyCache() {
- for (RoutingPolicy policy : routingPolicyCache.values()) {
- policy.destroy();
- }
- routingPolicyCache.clear();
- }
-}
+package com.yahoo.messagebus;
+
+import com.yahoo.concurrent.CopyOnWriteHashMap;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.routing.RoutingPolicy;
+import com.yahoo.text.Utf8String;
+
+import java.util.logging.Logger;
+
+/**
+ * Implements a thread-safe repository for protocols and their routing policies. This manages an internal cache of
+ * routing policies so that similarly referenced policy directives share the same instance of a policy.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ProtocolRepository {
+
+ private static final Logger log = Logger.getLogger(ProtocolRepository.class.getName());
+ private final CopyOnWriteHashMap<String, Protocol> protocols = new CopyOnWriteHashMap<>();
+ private final CopyOnWriteHashMap<String, RoutingPolicy> routingPolicyCache = new CopyOnWriteHashMap<>();
+
+ /**
+ * Registers a protocol with this repository. This will overwrite any protocol that was registered earlier that has
+ * the same name. If this method detects a protocol replacement, it will clear its internal routing policy cache.
+ *
+ * @param protocol The protocol to register.
+ */
+ public void putProtocol(Protocol protocol) {
+ if (protocols.put(protocol.getName(), protocol) != null) {
+ routingPolicyCache.clear();
+ }
+ }
+
+ /**
+ * Returns whether or not this repository contains a protocol with the given name. Given the concurrent nature of
+ * things, one should not invoke this method followed by {@link #getProtocol(String)} and expect the return value to
+ * be non-null. Instead just get the protocol and compare it to null.
+ *
+ * @param name The name to check for.
+ * @return True if the named protocol is registered.
+ */
+ public boolean hasProtocol(String name) {
+ return protocols.containsKey(name);
+ }
+
+ /**
+ * Returns the protocol whose name matches the given argument. This method will return null if no such protocol has
+ * been registered.
+ *
+ * @param name The name of the protocol to return.
+ * @return The protocol registered, or null.
+ */
+ public Protocol getProtocol(String name) {
+ return protocols.get(name);
+ }
+
+ /**
+ * Creates and returns a routing policy that matches the given arguments. If a routing policy has been created
+ * previously using the exact same parameters, this method will returned that cached instance instead of creating
+ * another. Not that when you replace a protocol using {@link #putProtocol(Protocol)} the policy cache is cleared.
+ *
+ * @param protocolName The name of the protocol whose routing policy to create.
+ * @param policyName The name of the routing policy to create.
+ * @param policyParam The parameter to pass to the routing policy constructor.
+ * @return The created routing policy.
+ */
+ public RoutingPolicy getRoutingPolicy(String protocolName, String policyName, String policyParam) {
+ String cacheKey = protocolName + "." + policyName + "." + policyParam;
+ RoutingPolicy ret = routingPolicyCache.get(cacheKey);
+ if (ret != null) {
+ return ret;
+ }
+ synchronized (this) {
+ Protocol protocol = getProtocol(protocolName);
+ if (protocol == null) {
+ log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' not supported.");
+ return null;
+ }
+ try {
+ ret = protocol.createPolicy(policyName, policyParam);
+ } catch (RuntimeException e) {
+ log.log(LogLevel.ERROR, "Protcol '" + protocolName + "' threw an exception: " + e.getMessage(), e);
+ return null;
+ }
+ if (ret == null) {
+ log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' failed to create routing policy '" + policyName +
+ "' with parameter '" + policyParam + "'.");
+ return null;
+ }
+ routingPolicyCache.put(cacheKey, ret);
+ }
+ return ret;
+ }
+
+ public final RoutingPolicy getRoutingPolicy(Utf8String protocolName, String policyName, String policyParam) {
+ return getRoutingPolicy(protocolName.toString(), policyName, policyParam);
+ }
+
+ /**
+ * Clears the internal cache of routing policies.
+ */
+ public synchronized void clearPolicyCache() {
+ for (RoutingPolicy policy : routingPolicyCache.values()) {
+ policy.destroy();
+ }
+ routingPolicyCache.clear();
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
index cfa50a35549..b0eb56ce657 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java
@@ -33,12 +33,12 @@ public class RPCMessageBus extends NetworkMessageBus {
public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams, String routingCfgId) {
this(mbusParams, new RPCNetwork(rpcParams), routingCfgId);
}
-
+
private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network, String routingCfgId) {
this(new MessageBus(network, mbusParams), network, routingCfgId);
}
-
- private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) {
+
+ private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) {
super(network, messageBus);
configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", messageBus);
configAgent.subscribe();
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java b/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java
index fe894427a13..aa091cc1f29 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java
@@ -1,93 +1,93 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.messagebus.metrics.RouteMetricSet;
-import com.yahoo.messagebus.network.Network;
-import com.yahoo.messagebus.routing.Resender;
-import com.yahoo.messagebus.routing.RoutingNode;
-import com.yahoo.log.LogLevel;
-
-import java.util.logging.Logger;
-
-/**
- * This class owns a message that is being sent by message bus. Once a reply is received, the message is attached to it
- * and returned to the application. This also implements the discard policy of {@link RoutingNode}.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SendProxy implements MessageHandler, ReplyHandler {
-
- private static final Logger log = Logger.getLogger(SendProxy.class.getName());
- private final MessageBus mbus;
- private final Network net;
- private final Resender resender;
- private Message msg = null;
- private boolean logTrace = false;
- private long sendTime = 0;
-
- /**
- * Constructs a new instance of this class to maintain sending of a single message.
- *
- * @param mbus The message bus that owns this.
- * @param net The network layer to transmit through.
- * @param resender The resender to use.
- */
- public SendProxy(MessageBus mbus, Network net, Resender resender) {
- this.mbus = mbus;
- this.net = net;
- this.resender = resender;
- sendTime = SystemTimer.INSTANCE.milliTime();
- }
-
- public void handleMessage(Message msg) {
- Trace trace = msg.getTrace();
- if (trace.getLevel() == 0) {
- if (log.isLoggable(LogLevel.SPAM)) {
- trace.setLevel(9);
- logTrace = true;
- } else if (log.isLoggable(LogLevel.DEBUG)) {
- trace.setLevel(6);
- logTrace = true;
- }
- }
- this.msg = msg;
- RoutingNode root = new RoutingNode(mbus, net, resender, this, msg);
- root.send();
- }
-
- public void handleReply(Reply reply) {
- if (reply == null) {
- msg.discard();
- } else {
- Trace trace = msg.getTrace();
- if (logTrace) {
- if (reply.hasErrors()) {
- log.log(LogLevel.DEBUG, "Trace for reply with error(s):\n" + reply.getTrace());
- } else if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Trace for reply:\n" + reply.getTrace());
- }
- Trace empty = new Trace();
- trace.swap(empty);
- } else if (trace.getLevel() > 0) {
- trace.getRoot().addChild(reply.getTrace().getRoot());
- trace.getRoot().normalize();
- }
- reply.swapState(msg);
- reply.setMessage(msg);
-
- if (msg.getRoute() != null) {
- RouteMetricSet metrics = mbus.getMetrics().getRouteMetrics(msg.getRoute());
- for (int i = 0; i < reply.getNumErrors(); i++) {
- metrics.addFailure(reply.getError(i));
- }
- if (reply.getNumErrors() == 0) {
- metrics.latency.addValue(msg.getTimeReceived() - sendTime);
- }
- }
-
- ReplyHandler handler = reply.popHandler();
- handler.handleReply(reply);
- }
- }
-}
+package com.yahoo.messagebus;
+
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.messagebus.metrics.RouteMetricSet;
+import com.yahoo.messagebus.network.Network;
+import com.yahoo.messagebus.routing.Resender;
+import com.yahoo.messagebus.routing.RoutingNode;
+import com.yahoo.log.LogLevel;
+
+import java.util.logging.Logger;
+
+/**
+ * This class owns a message that is being sent by message bus. Once a reply is received, the message is attached to it
+ * and returned to the application. This also implements the discard policy of {@link RoutingNode}.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SendProxy implements MessageHandler, ReplyHandler {
+
+ private static final Logger log = Logger.getLogger(SendProxy.class.getName());
+ private final MessageBus mbus;
+ private final Network net;
+ private final Resender resender;
+ private Message msg = null;
+ private boolean logTrace = false;
+ private long sendTime = 0;
+
+ /**
+ * Constructs a new instance of this class to maintain sending of a single message.
+ *
+ * @param mbus The message bus that owns this.
+ * @param net The network layer to transmit through.
+ * @param resender The resender to use.
+ */
+ public SendProxy(MessageBus mbus, Network net, Resender resender) {
+ this.mbus = mbus;
+ this.net = net;
+ this.resender = resender;
+ sendTime = SystemTimer.INSTANCE.milliTime();
+ }
+
+ public void handleMessage(Message msg) {
+ Trace trace = msg.getTrace();
+ if (trace.getLevel() == 0) {
+ if (log.isLoggable(LogLevel.SPAM)) {
+ trace.setLevel(9);
+ logTrace = true;
+ } else if (log.isLoggable(LogLevel.DEBUG)) {
+ trace.setLevel(6);
+ logTrace = true;
+ }
+ }
+ this.msg = msg;
+ RoutingNode root = new RoutingNode(mbus, net, resender, this, msg);
+ root.send();
+ }
+
+ public void handleReply(Reply reply) {
+ if (reply == null) {
+ msg.discard();
+ } else {
+ Trace trace = msg.getTrace();
+ if (logTrace) {
+ if (reply.hasErrors()) {
+ log.log(LogLevel.DEBUG, "Trace for reply with error(s):\n" + reply.getTrace());
+ } else if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Trace for reply:\n" + reply.getTrace());
+ }
+ Trace empty = new Trace();
+ trace.swap(empty);
+ } else if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(reply.getTrace().getRoot());
+ trace.getRoot().normalize();
+ }
+ reply.swapState(msg);
+ reply.setMessage(msg);
+
+ if (msg.getRoute() != null) {
+ RouteMetricSet metrics = mbus.getMetrics().getRouteMetrics(msg.getRoute());
+ for (int i = 0; i < reply.getNumErrors(); i++) {
+ metrics.addFailure(reply.getError(i));
+ }
+ if (reply.getNumErrors() == 0) {
+ metrics.latency.addValue(msg.getTimeReceived() - sendTime);
+ }
+ }
+
+ ReplyHandler handler = reply.popHandler();
+ handler.handleReply(reply);
+ }
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
index bb1ae9e69b3..f65b165a768 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
@@ -1,91 +1,91 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-/**
- * This is an implementatin of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a
- * {@link SourceSession} is allowed to have. You may choose to set a limit to the total number of pending messages (by
- * way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link
- * #setMaxPendingSize(long)}), or some combination thereof.
- *
- * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class StaticThrottlePolicy implements ThrottlePolicy {
-
- private int maxPendingCount = 0;
- private long maxPendingSize = 0;
- private long pendingSize = 0;
-
- public boolean canSend(Message msg, int pendingCount) {
- if (maxPendingCount > 0 && pendingCount >= maxPendingCount) {
- return false;
- }
- if (maxPendingSize > 0 && pendingSize >= maxPendingSize) {
- return false;
- }
- return true;
- }
-
- public void processMessage(Message msg) {
- int size = msg.getApproxSize();
- msg.setContext(size);
- pendingSize += size;
- }
-
- public void processReply(Reply reply) {
- int size = (Integer)reply.getContext();
- pendingSize -= size;
- }
-
- /**
- * Returns the maximum number of pending messages allowed.
- *
- * @return The max limit.
- */
- public int getMaxPendingCount() {
- return maxPendingCount;
- }
-
- /**
- * Sets the maximum number of pending messages allowed.
- *
- * @param maxCount The max count.
- * @return This, to allow chaining.
- */
- public StaticThrottlePolicy setMaxPendingCount(int maxCount) {
- maxPendingCount = maxCount;
- return this;
- }
-
- /**
- * Returns the maximum total size of pending messages allowed.
- *
- * @return The max limit.
- */
- public long getMaxPendingSize() {
- return maxPendingSize;
- }
-
- /**
- * Sets the maximum total size of pending messages allowed. This size is relative to the value returned by {@link
- * com.yahoo.messagebus.Message#getApproxSize()}.
- *
- * @param maxSize The max size.
- * @return This, to allow chaining.
- */
- public StaticThrottlePolicy setMaxPendingSize(long maxSize) {
- maxPendingSize = maxSize;
- return this;
- }
-
- /**
- * Returns the total size of pending messages.
- *
- * @return The size.
- */
- public long getPendingSize() {
- return pendingSize;
- }
-
-}
+package com.yahoo.messagebus;
+
+/**
+ * This is an implementatin of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a
+ * {@link SourceSession} is allowed to have. You may choose to set a limit to the total number of pending messages (by
+ * way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link
+ * #setMaxPendingSize(long)}), or some combination thereof.
+ *
+ * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class StaticThrottlePolicy implements ThrottlePolicy {
+
+ private int maxPendingCount = 0;
+ private long maxPendingSize = 0;
+ private long pendingSize = 0;
+
+ public boolean canSend(Message msg, int pendingCount) {
+ if (maxPendingCount > 0 && pendingCount >= maxPendingCount) {
+ return false;
+ }
+ if (maxPendingSize > 0 && pendingSize >= maxPendingSize) {
+ return false;
+ }
+ return true;
+ }
+
+ public void processMessage(Message msg) {
+ int size = msg.getApproxSize();
+ msg.setContext(size);
+ pendingSize += size;
+ }
+
+ public void processReply(Reply reply) {
+ int size = (Integer)reply.getContext();
+ pendingSize -= size;
+ }
+
+ /**
+ * Returns the maximum number of pending messages allowed.
+ *
+ * @return The max limit.
+ */
+ public int getMaxPendingCount() {
+ return maxPendingCount;
+ }
+
+ /**
+ * Sets the maximum number of pending messages allowed.
+ *
+ * @param maxCount The max count.
+ * @return This, to allow chaining.
+ */
+ public StaticThrottlePolicy setMaxPendingCount(int maxCount) {
+ maxPendingCount = maxCount;
+ return this;
+ }
+
+ /**
+ * Returns the maximum total size of pending messages allowed.
+ *
+ * @return The max limit.
+ */
+ public long getMaxPendingSize() {
+ return maxPendingSize;
+ }
+
+ /**
+ * Sets the maximum total size of pending messages allowed. This size is relative to the value returned by {@link
+ * com.yahoo.messagebus.Message#getApproxSize()}.
+ *
+ * @param maxSize The max size.
+ * @return This, to allow chaining.
+ */
+ public StaticThrottlePolicy setMaxPendingSize(long maxSize) {
+ maxPendingSize = maxSize;
+ return this;
+ }
+
+ /**
+ * Returns the total size of pending messages.
+ *
+ * @return The size.
+ */
+ public long getPendingSize() {
+ return pendingSize;
+ }
+
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
index 611694b0c62..6b095060427 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
@@ -1,36 +1,36 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-/**
- * An implementation of this interface is used by {@link SourceSession} to throttle output. Every message entering
- * {@link SourceSession#send(Message)} needs to be accepted by this interface's {@link #canSend(Message, int)} method.
- * All messages accepted are passed through the {@link #processMessage(Message)} method, and the corresponding replies
- * are passed through the {@link #processReply(Reply)} method.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public interface ThrottlePolicy {
-
- /**
- * Returns whether or not the given message can be sent according to the current state of this policy.
- *
- * @param msg The message to evaluate.
- * @param pendingCount The current number of pending messages.
- * @return True to send the message.
- */
- public boolean canSend(Message msg, int pendingCount);
-
- /**
- * This method is called once for every message that was accepted by {@link #canSend(Message, int)} and sent.
- *
- * @param msg The message beint sent.
- */
- public void processMessage(Message msg);
-
- /**
- * This method is called once for every reply that is received.
- *
- * @param reply The reply received.
- */
- public void processReply(Reply reply);
-}
+package com.yahoo.messagebus;
+
+/**
+ * An implementation of this interface is used by {@link SourceSession} to throttle output. Every message entering
+ * {@link SourceSession#send(Message)} needs to be accepted by this interface's {@link #canSend(Message, int)} method.
+ * All messages accepted are passed through the {@link #processMessage(Message)} method, and the corresponding replies
+ * are passed through the {@link #processReply(Reply)} method.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public interface ThrottlePolicy {
+
+ /**
+ * Returns whether or not the given message can be sent according to the current state of this policy.
+ *
+ * @param msg The message to evaluate.
+ * @param pendingCount The current number of pending messages.
+ * @return True to send the message.
+ */
+ public boolean canSend(Message msg, int pendingCount);
+
+ /**
+ * This method is called once for every message that was accepted by {@link #canSend(Message, int)} and sent.
+ *
+ * @param msg The message beint sent.
+ */
+ public void processMessage(Message msg);
+
+ /**
+ * This method is called once for every reply that is received.
+ *
+ * @param reply The reply received.
+ */
+ public void processReply(Reply reply);
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java b/messagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java
index 994eb17b434..07b91a3280f 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java
@@ -1,30 +1,30 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-/**
- * This class defines the {@link Trace} levels used by message bus.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public final class TraceLevel {
-
- /**
- * Traces whenever an Error is added to a Reply.
- */
- public static final int ERROR = 1;
-
- /**
- * Traces sending and receiving messages and replies on network level.
- */
- public static final int SEND_RECEIVE = 4;
-
- /**
- * Traces splitting messages and merging replies.
- */
- public static final int SPLIT_MERGE = 5;
-
- /**
- * Traces information about which internal components are processing a routable.
- */
- public static final int COMPONENT = 6;
+package com.yahoo.messagebus;
+
+/**
+ * This class defines the {@link Trace} levels used by message bus.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public final class TraceLevel {
+
+ /**
+ * Traces whenever an Error is added to a Reply.
+ */
+ public static final int ERROR = 1;
+
+ /**
+ * Traces sending and receiving messages and replies on network level.
+ */
+ public static final int SEND_RECEIVE = 4;
+
+ /**
+ * Traces splitting messages and merging replies.
+ */
+ public static final int SPLIT_MERGE = 5;
+
+ /**
+ * Traces information about which internal components are processing a routable.
+ */
+ public static final int COMPONENT = 6;
} \ No newline at end of file
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
index 78cf352cfbf..efe96c5babb 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java
@@ -35,7 +35,7 @@ public class LocalNetwork implements Network {
public LocalNetwork() {
this(new LocalWire());
}
-
+
public LocalNetwork(LocalWire wire) {
this.wire = wire;
this.hostId = wire.newHostId();
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java
index 47f688e0f51..adc393a77c7 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java
@@ -1,131 +1,131 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * This class holds the specifications of an application running message bus services. It is used for ensuring that a
- * {@link RoutingSpec} holds valid routing specifications.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ApplicationSpec {
-
- private final HashMap<String, HashSet<String>> services = new HashMap<String, HashSet<String>>();
-
- /**
- * Constructs a new instance of this class.
- */
- public ApplicationSpec() {
- // empty
- }
-
- /**
- * Implements the copy constructor.
- *
- * @param obj The object to copy.
- */
- public ApplicationSpec(ApplicationSpec obj) {
- add(obj);
- }
-
- /**
- * Adds the content of the given application to this.
- *
- * @param app The application whose content to copy.
- * @return This, to allow chaining.
- */
- public ApplicationSpec add(ApplicationSpec app) {
- for (Map.Entry<String, HashSet<String>> entry : app.services.entrySet()) {
- String protocol = entry.getKey();
- for (String service : entry.getValue()) {
- addService(protocol, service);
- }
- }
- return this;
- }
-
- /**
- * Adds a service name to the list of known services.
- *
- * @param protocol The protocol for which to add the service.
- * @param name The service to add.
- * @return This, to allow chaining.
- */
- public ApplicationSpec addService(String protocol, String name) {
- if (!services.containsKey(protocol)) {
- services.put(protocol, new HashSet<String>());
- }
- services.get(protocol).add(name);
- return this;
- }
-
- /**
- * Determines whether or not the given service pattern matches any of the known services.
- *
- * @param protocol The protocol whose services to check.
- * @param pattern The pattern to match.
- * @return True if at least one service was found.
- */
- public boolean isService(String protocol, String pattern) {
- if (services.containsKey(protocol)) {
- Pattern regex = toRegex(pattern);
- for (String service : services.get(protocol)) {
- if (regex.matcher(service).find()) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Converts the given string pattern to a usable regex pattern object.
- *
- * @param pattern The string pattern to convert.
- * @return The corresponding regex pattern object.
- */
- private static Pattern toRegex(String pattern) {
- StringBuilder ret = new StringBuilder();
- ret.append("^");
- for (int i = 0; i < pattern.length(); i++) {
- ret.append(toRegex(pattern.charAt(i)));
- }
- ret.append("$");
- return Pattern.compile(ret.toString());
- }
-
- /**
- * Converts a single string pattern char to a regex string. This method is invoked by {@link #toRegex(String)} once
- * for each character in the string pattern.
- *
- * @param c The character to convert.
- * @return The corresponding regex pattern string.
- */
- private static String toRegex(char c) {
- switch (c) {
- case '*':
- return ".*";
- case '?':
- return ".";
- case '^':
- case '$':
- case '|':
- case '{':
- case '}':
- case '(':
- case ')':
- case '[':
- case ']':
- case '\\':
- case '+':
- case '.':
- return "\\" + c;
- default:
- return "" + c;
- }
- }
-}
+package com.yahoo.messagebus.routing;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * This class holds the specifications of an application running message bus services. It is used for ensuring that a
+ * {@link RoutingSpec} holds valid routing specifications.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ApplicationSpec {
+
+ private final HashMap<String, HashSet<String>> services = new HashMap<String, HashSet<String>>();
+
+ /**
+ * Constructs a new instance of this class.
+ */
+ public ApplicationSpec() {
+ // empty
+ }
+
+ /**
+ * Implements the copy constructor.
+ *
+ * @param obj The object to copy.
+ */
+ public ApplicationSpec(ApplicationSpec obj) {
+ add(obj);
+ }
+
+ /**
+ * Adds the content of the given application to this.
+ *
+ * @param app The application whose content to copy.
+ * @return This, to allow chaining.
+ */
+ public ApplicationSpec add(ApplicationSpec app) {
+ for (Map.Entry<String, HashSet<String>> entry : app.services.entrySet()) {
+ String protocol = entry.getKey();
+ for (String service : entry.getValue()) {
+ addService(protocol, service);
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Adds a service name to the list of known services.
+ *
+ * @param protocol The protocol for which to add the service.
+ * @param name The service to add.
+ * @return This, to allow chaining.
+ */
+ public ApplicationSpec addService(String protocol, String name) {
+ if (!services.containsKey(protocol)) {
+ services.put(protocol, new HashSet<String>());
+ }
+ services.get(protocol).add(name);
+ return this;
+ }
+
+ /**
+ * Determines whether or not the given service pattern matches any of the known services.
+ *
+ * @param protocol The protocol whose services to check.
+ * @param pattern The pattern to match.
+ * @return True if at least one service was found.
+ */
+ public boolean isService(String protocol, String pattern) {
+ if (services.containsKey(protocol)) {
+ Pattern regex = toRegex(pattern);
+ for (String service : services.get(protocol)) {
+ if (regex.matcher(service).find()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Converts the given string pattern to a usable regex pattern object.
+ *
+ * @param pattern The string pattern to convert.
+ * @return The corresponding regex pattern object.
+ */
+ private static Pattern toRegex(String pattern) {
+ StringBuilder ret = new StringBuilder();
+ ret.append("^");
+ for (int i = 0; i < pattern.length(); i++) {
+ ret.append(toRegex(pattern.charAt(i)));
+ }
+ ret.append("$");
+ return Pattern.compile(ret.toString());
+ }
+
+ /**
+ * Converts a single string pattern char to a regex string. This method is invoked by {@link #toRegex(String)} once
+ * for each character in the string pattern.
+ *
+ * @param c The character to convert.
+ * @return The corresponding regex pattern string.
+ */
+ private static String toRegex(char c) {
+ switch (c) {
+ case '*':
+ return ".*";
+ case '?':
+ return ".";
+ case '^':
+ case '$':
+ case '|':
+ case '{':
+ case '}':
+ case '(':
+ case ')':
+ case '[':
+ case ']':
+ case '\\':
+ case '+':
+ case '.':
+ return "\\" + c;
+ default:
+ return "" + c;
+ }
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java
index 9623b45ca20..89ee638f41c 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java
@@ -1,26 +1,26 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-/**
- * This class is the base class for the primitives that make up a {@link Hop}'s selector.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public interface HopDirective {
-
- /**
- * Returns true if this directive matches another.
- *
- * @param dir The directive to compare this to.
- * @return True if this matches the argument.
- */
- public boolean matches(HopDirective dir);
-
- /**
- * Returns a string representation of this that can be debugged but not parsed.
- *
- * @return The debug string.
- */
- public String toDebugString();
-}
-
+package com.yahoo.messagebus.routing;
+
+/**
+ * This class is the base class for the primitives that make up a {@link Hop}'s selector.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public interface HopDirective {
+
+ /**
+ * Returns true if this directive matches another.
+ *
+ * @param dir The directive to compare this to.
+ * @return True if this matches the argument.
+ */
+ public boolean matches(HopDirective dir);
+
+ /**
+ * Returns a string representation of this that can be debugged but not parsed.
+ *
+ * @return The debug string.
+ */
+ public String toDebugString();
+}
+
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java
index e4b88dca6e7..7dd1966cb7b 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java
@@ -1,83 +1,83 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-/**
- * This class represents a policy directive within a {@link Hop}'s selector. This means to create the named protocol
- * using the given parameter string, and the running that protocol within the context of this directive.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class PolicyDirective implements HopDirective {
-
- private final String name;
- private final String param;
-
- /**
- * Constructs a new policy selector item.
- *
- * @param name The name of the policy to invoke.
- * @param param The parameter to pass to the name constructor.
- */
- public PolicyDirective(String name, String param) {
- this.name = name;
- this.param = param;
- }
-
- @Override
- public boolean matches(HopDirective dir) {
- return true;
- }
-
- /**
- * Returns the name of the policy that this item is to invoke.
- *
- * @return The name name.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Returns the parameter string for this policy directive.
- *
- * @return The parameter.
- */
- public String getParam() {
- return param;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof PolicyDirective)) {
- return false;
- }
- PolicyDirective rhs = (PolicyDirective)obj;
- if (!name.equals(rhs.name)) {
- return false;
- }
- if (param == null && rhs.param != null) {
- return false;
- }
- if (param != null && !param.equals(rhs.param)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "[" + name + (param != null && param.length() > 0 ? ":" + param : "") + "]";
- }
-
- @Override
- public String toDebugString() {
- return "PolicyDirective(name = '" + name + "', param = '" + param + "')";
- }
-
- @Override
- public int hashCode() {
- int result = name != null ? name.hashCode() : 0;
- result = 31 * result + (param != null ? param.hashCode() : 0);
- return result;
- }
+package com.yahoo.messagebus.routing;
+
+/**
+ * This class represents a policy directive within a {@link Hop}'s selector. This means to create the named protocol
+ * using the given parameter string, and the running that protocol within the context of this directive.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class PolicyDirective implements HopDirective {
+
+ private final String name;
+ private final String param;
+
+ /**
+ * Constructs a new policy selector item.
+ *
+ * @param name The name of the policy to invoke.
+ * @param param The parameter to pass to the name constructor.
+ */
+ public PolicyDirective(String name, String param) {
+ this.name = name;
+ this.param = param;
+ }
+
+ @Override
+ public boolean matches(HopDirective dir) {
+ return true;
+ }
+
+ /**
+ * Returns the name of the policy that this item is to invoke.
+ *
+ * @return The name name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the parameter string for this policy directive.
+ *
+ * @return The parameter.
+ */
+ public String getParam() {
+ return param;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof PolicyDirective)) {
+ return false;
+ }
+ PolicyDirective rhs = (PolicyDirective)obj;
+ if (!name.equals(rhs.name)) {
+ return false;
+ }
+ if (param == null && rhs.param != null) {
+ return false;
+ }
+ if (param != null && !param.equals(rhs.param)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + name + (param != null && param.length() > 0 ? ":" + param : "") + "]";
+ }
+
+ @Override
+ public String toDebugString() {
+ return "PolicyDirective(name = '" + name + "', param = '" + param + "')";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + (param != null ? param.hashCode() : 0);
+ return result;
+ }
} \ No newline at end of file
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java
index 19ea949e741..4231c389a5f 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java
@@ -1,30 +1,30 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-/**
- * When a {@link com.yahoo.messagebus.Reply} containing errors is returned to a {@link com.yahoo.messagebus.MessageBus},
- * an object implementing this interface is consulted on whether or not to resend the corresponding {@link
- * com.yahoo.messagebus.Message}. The policy is passed to the message bus at creation time using the {@link
- * com.yahoo.messagebus.MessageBusParams#setRetryPolicy(RetryPolicy)} method.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public interface RetryPolicy {
-
- /**
- * Returns whether or not a {@link com.yahoo.messagebus.Reply} containing an {@link com.yahoo.messagebus.Error} with
- * the given error code can be retried. This method is invoked once for each error in a reply.
- *
- * @param errorCode The code to check.
- * @return True if the message can be resent.
- */
- public boolean canRetry(int errorCode);
-
- /**
- * Returns the number of seconds to delay resending a message.
- *
- * @param retry The retry attempt.
- * @return The delay in seconds.
- */
- public double getRetryDelay(int retry);
-}
+package com.yahoo.messagebus.routing;
+
+/**
+ * When a {@link com.yahoo.messagebus.Reply} containing errors is returned to a {@link com.yahoo.messagebus.MessageBus},
+ * an object implementing this interface is consulted on whether or not to resend the corresponding {@link
+ * com.yahoo.messagebus.Message}. The policy is passed to the message bus at creation time using the {@link
+ * com.yahoo.messagebus.MessageBusParams#setRetryPolicy(RetryPolicy)} method.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public interface RetryPolicy {
+
+ /**
+ * Returns whether or not a {@link com.yahoo.messagebus.Reply} containing an {@link com.yahoo.messagebus.Error} with
+ * the given error code can be retried. This method is invoked once for each error in a reply.
+ *
+ * @param errorCode The code to check.
+ * @return True if the message can be resent.
+ */
+ public boolean canRetry(int errorCode);
+
+ /**
+ * Returns the number of seconds to delay resending a message.
+ *
+ * @param retry The retry attempt.
+ * @return The delay in seconds.
+ */
+ public double getRetryDelay(int retry);
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java
index 128ca3e819d..9dd2f7889ec 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java
@@ -1,48 +1,48 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-import com.yahoo.messagebus.ErrorCode;
-
-/**
- * Implements a retry policy that allows resending of any error that is not fatal. It also does progressive back-off,
- * delaying each attempt by the given time multiplied by the retry attempt.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RetryTransientErrorsPolicy implements RetryPolicy {
-
- private volatile boolean enabled = true;
- private volatile double baseDelay = 1;
-
- /**
- * Sets whether or not this policy should allow retries or not.
- *
- * @param enabled True to allow retries.
- * @return This, to allow chaining.
- */
- public RetryTransientErrorsPolicy setEnabled(boolean enabled) {
- this.enabled = enabled;
- return this;
- }
-
- /**
- * Sets the base delay in seconds to wait between retries. This amount is multiplied by the retry number.
- *
- * @param baseDelay The time in seconds.
- * @return This, to allow chaining.
- */
- public RetryTransientErrorsPolicy setBaseDelay(double baseDelay) {
- this.baseDelay = baseDelay;
- return this;
- }
-
- @Override
- public boolean canRetry(int errorCode) {
- return enabled && errorCode < ErrorCode.FATAL_ERROR;
- }
-
- @Override
- public double getRetryDelay(int retry) {
- return baseDelay * retry;
- }
-}
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.messagebus.ErrorCode;
+
+/**
+ * Implements a retry policy that allows resending of any error that is not fatal. It also does progressive back-off,
+ * delaying each attempt by the given time multiplied by the retry attempt.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RetryTransientErrorsPolicy implements RetryPolicy {
+
+ private volatile boolean enabled = true;
+ private volatile double baseDelay = 1;
+
+ /**
+ * Sets whether or not this policy should allow retries or not.
+ *
+ * @param enabled True to allow retries.
+ * @return This, to allow chaining.
+ */
+ public RetryTransientErrorsPolicy setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ return this;
+ }
+
+ /**
+ * Sets the base delay in seconds to wait between retries. This amount is multiplied by the retry number.
+ *
+ * @param baseDelay The time in seconds.
+ * @return This, to allow chaining.
+ */
+ public RetryTransientErrorsPolicy setBaseDelay(double baseDelay) {
+ this.baseDelay = baseDelay;
+ return this;
+ }
+
+ @Override
+ public boolean canRetry(int errorCode) {
+ return enabled && errorCode < ErrorCode.FATAL_ERROR;
+ }
+
+ @Override
+ public double getRetryDelay(int retry) {
+ return baseDelay * retry;
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java
index 6ea79a256e0..71abfc5376e 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java
@@ -1,63 +1,63 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-/**
- * This class represents a route directive within a {@link Hop}'s selector. This will be replaced by the named route
- * when evaluated. If the route is not present in the running protocol's routing table, routing will fail.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RouteDirective implements HopDirective {
-
- private final String name;
-
- /**
- * Constructs a new directive to insert a route.
- *
- * @param name The name of the route to insert.
- */
- public RouteDirective(String name) {
- this.name = name;
- }
-
- @Override
- public boolean matches(HopDirective dir) {
- return dir instanceof RouteDirective && name.equals(((RouteDirective)dir).name);
- }
-
- /**
- * Returns the name of the route to insert.
- *
- * @return The route name.
- */
- public String getName() {
- return name;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof RouteDirective)) {
- return false;
- }
- RouteDirective rhs = (RouteDirective)obj;
- if (!name.equals(rhs.name)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "route:" + name;
- }
-
- @Override
- public String toDebugString() {
- return "RouteDirective(name = '" + name + "')";
- }
-
- @Override
- public int hashCode() {
- return name != null ? name.hashCode() : 0;
- }
-}
+package com.yahoo.messagebus.routing;
+
+/**
+ * This class represents a route directive within a {@link Hop}'s selector. This will be replaced by the named route
+ * when evaluated. If the route is not present in the running protocol's routing table, routing will fail.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RouteDirective implements HopDirective {
+
+ private final String name;
+
+ /**
+ * Constructs a new directive to insert a route.
+ *
+ * @param name The name of the route to insert.
+ */
+ public RouteDirective(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean matches(HopDirective dir) {
+ return dir instanceof RouteDirective && name.equals(((RouteDirective)dir).name);
+ }
+
+ /**
+ * Returns the name of the route to insert.
+ *
+ * @return The route name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RouteDirective)) {
+ return false;
+ }
+ RouteDirective rhs = (RouteDirective)obj;
+ if (!name.equals(rhs.name)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "route:" + name;
+ }
+
+ @Override
+ public String toDebugString() {
+ return "RouteDirective(name = '" + name + "')";
+ }
+
+ @Override
+ public int hashCode() {
+ return name != null ? name.hashCode() : 0;
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java
index dbf152fdfcf..e8caf3ad9b1 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java
@@ -1,107 +1,107 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-import com.yahoo.messagebus.Reply;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Implements an iterator for routing nodes. Use {@link RoutingContext#getChildIterator()} to retrieve an instance of
- * this.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RoutingNodeIterator {
-
- // The underlying iterator.
- private Iterator<RoutingNode> it;
-
- // The current child entry.
- private RoutingNode entry;
-
- /**
- * Constructs a new iterator based on a given list.
- *
- * @param children The list to iterate through.
- */
- public RoutingNodeIterator(List<RoutingNode> children) {
- it = children.iterator();
- next();
- }
-
- /**
- * Steps to the next child in the map.
- *
- * @return This, to allow chaining.
- */
- public RoutingNodeIterator next() {
- entry = it.hasNext() ? it.next() : null;
- return this;
- }
-
- /**
- * Skips the given number of children.
- *
- * @param num The number of children to skip.
- * @return This, to allow chaining.
- */
- public RoutingNodeIterator skip(int num) {
- for (int i = 0; i < num && isValid(); ++i) {
- next();
- }
- return this;
- }
-
- /**
- * Returns whether or not this iterator is valid.
- *
- * @return True if we are still pointing to a valid entry.
- */
- public boolean isValid() {
- return entry != null;
- }
-
- /**
- * Returns the route of the current child.
- *
- * @return The route.
- */
- public Route getRoute() {
- return entry.getRoute();
- }
-
- /**
- * Returns whether or not a reply is set in the current child.
- *
- * @return True if a reply is available.
- */
- public boolean hasReply() {
- return entry.hasReply();
- }
-
- /**
- * Removes and returns the reply of the current child. This is the correct way of reusing a reply of a child node,
- * the {@link #getReplyRef()} should be used when just inspecting a child reply.
- *
- * @return The reply.
- */
- public Reply removeReply() {
- Reply ret = entry.getReply();
- ret.getTrace().setLevel(entry.getTrace().getLevel());
- ret.getTrace().swap(entry.getTrace());
- entry.setReply(null);
- return ret;
- }
-
- /**
- * Returns the reply of the current child. It is VERY important that the reply returned by this function is not
- * reused anywhere. This is a reference to another node's reply, do NOT use it for anything but inspection. If you
- * want to retrieve and reuse it, call {@link #removeReply()} instead.
- *
- * @return The reply.
- */
- public Reply getReplyRef() {
- return entry.getReply();
- }
-}
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.messagebus.Reply;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implements an iterator for routing nodes. Use {@link RoutingContext#getChildIterator()} to retrieve an instance of
+ * this.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RoutingNodeIterator {
+
+ // The underlying iterator.
+ private Iterator<RoutingNode> it;
+
+ // The current child entry.
+ private RoutingNode entry;
+
+ /**
+ * Constructs a new iterator based on a given list.
+ *
+ * @param children The list to iterate through.
+ */
+ public RoutingNodeIterator(List<RoutingNode> children) {
+ it = children.iterator();
+ next();
+ }
+
+ /**
+ * Steps to the next child in the map.
+ *
+ * @return This, to allow chaining.
+ */
+ public RoutingNodeIterator next() {
+ entry = it.hasNext() ? it.next() : null;
+ return this;
+ }
+
+ /**
+ * Skips the given number of children.
+ *
+ * @param num The number of children to skip.
+ * @return This, to allow chaining.
+ */
+ public RoutingNodeIterator skip(int num) {
+ for (int i = 0; i < num && isValid(); ++i) {
+ next();
+ }
+ return this;
+ }
+
+ /**
+ * Returns whether or not this iterator is valid.
+ *
+ * @return True if we are still pointing to a valid entry.
+ */
+ public boolean isValid() {
+ return entry != null;
+ }
+
+ /**
+ * Returns the route of the current child.
+ *
+ * @return The route.
+ */
+ public Route getRoute() {
+ return entry.getRoute();
+ }
+
+ /**
+ * Returns whether or not a reply is set in the current child.
+ *
+ * @return True if a reply is available.
+ */
+ public boolean hasReply() {
+ return entry.hasReply();
+ }
+
+ /**
+ * Removes and returns the reply of the current child. This is the correct way of reusing a reply of a child node,
+ * the {@link #getReplyRef()} should be used when just inspecting a child reply.
+ *
+ * @return The reply.
+ */
+ public Reply removeReply() {
+ Reply ret = entry.getReply();
+ ret.getTrace().setLevel(entry.getTrace().getLevel());
+ ret.getTrace().swap(entry.getTrace());
+ entry.setReply(null);
+ return ret;
+ }
+
+ /**
+ * Returns the reply of the current child. It is VERY important that the reply returned by this function is not
+ * reused anywhere. This is a reference to another node's reply, do NOT use it for anything but inspection. If you
+ * want to retrieve and reuse it, call {@link #removeReply()} instead.
+ *
+ * @return The reply.
+ */
+ public Reply getReplyRef() {
+ return entry.getReply();
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java
index dd0f6a47596..85be2c26baf 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java
@@ -1,100 +1,100 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-/**
- * This class represents a tcp directive within a {@link Hop}'s selector. This is a connection string used to establish
- * a direct connection to a host, bypassing service lookups through Slobrok.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class TcpDirective implements HopDirective {
-
- private final String host;
- private final int port;
- private final String session;
-
- /**
- * Constructs a new directive to route directly to a tcp address.
- *
- * @param host The host name to connect to.
- * @param port The port to connect to.
- * @param session The session to route to.
- */
- public TcpDirective(String host, int port, String session) {
- this.host = host;
- this.port = port;
- this.session = session;
- }
-
- @Override
- public boolean matches(HopDirective dir) {
- if (!(dir instanceof TcpDirective)) {
- return false;
- }
- TcpDirective rhs = (TcpDirective)dir;
- return host.equals(rhs.host) && port == rhs.port && session.equals(rhs.session);
- }
-
- /**
- * Returns the host to connect to. This may be an ip address or a name.
- *
- * @return The host.
- */
- public String getHost() {
- return host;
- }
-
- /**
- * Returns the port to connect to on the remove host.
- *
- * @return The port number.
- */
- public int getPort() {
- return port;
- }
-
- /**
- * Returns the name of the session to route to.
- *
- * @return The session name.
- */
- public String getSession() {
- return session;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof TcpDirective)) {
- return false;
- }
- TcpDirective rhs = (TcpDirective)obj;
- if (!host.equals(rhs.host)) {
- return false;
- }
- if (port != rhs.port) {
- return false;
- }
- if (!session.equals(rhs.session)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "tcp/" + host + ":" + port + "/" + session;
- }
-
- @Override
- public String toDebugString() {
- return "TcpDirective(host = '" + host + "', port = " + port + ", session = '" + session + "')";
- }
-
- @Override
- public int hashCode() {
- int result = host != null ? host.hashCode() : 0;
- result = 31 * result + port;
- result = 31 * result + (session != null ? session.hashCode() : 0);
- return result;
- }
-}
+package com.yahoo.messagebus.routing;
+
+/**
+ * This class represents a tcp directive within a {@link Hop}'s selector. This is a connection string used to establish
+ * a direct connection to a host, bypassing service lookups through Slobrok.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class TcpDirective implements HopDirective {
+
+ private final String host;
+ private final int port;
+ private final String session;
+
+ /**
+ * Constructs a new directive to route directly to a tcp address.
+ *
+ * @param host The host name to connect to.
+ * @param port The port to connect to.
+ * @param session The session to route to.
+ */
+ public TcpDirective(String host, int port, String session) {
+ this.host = host;
+ this.port = port;
+ this.session = session;
+ }
+
+ @Override
+ public boolean matches(HopDirective dir) {
+ if (!(dir instanceof TcpDirective)) {
+ return false;
+ }
+ TcpDirective rhs = (TcpDirective)dir;
+ return host.equals(rhs.host) && port == rhs.port && session.equals(rhs.session);
+ }
+
+ /**
+ * Returns the host to connect to. This may be an ip address or a name.
+ *
+ * @return The host.
+ */
+ public String getHost() {
+ return host;
+ }
+
+ /**
+ * Returns the port to connect to on the remove host.
+ *
+ * @return The port number.
+ */
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Returns the name of the session to route to.
+ *
+ * @return The session name.
+ */
+ public String getSession() {
+ return session;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof TcpDirective)) {
+ return false;
+ }
+ TcpDirective rhs = (TcpDirective)obj;
+ if (!host.equals(rhs.host)) {
+ return false;
+ }
+ if (port != rhs.port) {
+ return false;
+ }
+ if (!session.equals(rhs.session)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "tcp/" + host + ":" + port + "/" + session;
+ }
+
+ @Override
+ public String toDebugString() {
+ return "TcpDirective(host = '" + host + "', port = " + port + ", session = '" + session + "')";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = host != null ? host.hashCode() : 0;
+ result = 31 * result + port;
+ result = 31 * result + (session != null ? session.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java
index f2f95fa53f8..0f743ef8bf5 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java
@@ -1,65 +1,65 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-/**
- * This class represents a verbatim match within a {@link Hop}'s selector. This is nothing more than a string that will
- * be used as-is when performing service name lookups.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class VerbatimDirective implements HopDirective {
-
- private final String image;
-
- /**
- * Constructs a new verbatim selector item for a given image.
- *
- * @param image The image to assign to this.
- */
- public VerbatimDirective(String image) {
- this.image = image;
- }
-
- @Override
- public boolean matches(HopDirective dir) {
-
- return dir instanceof VerbatimDirective && image.equals(((VerbatimDirective)dir).image);
- }
-
- /**
- * Returns the image to which this is a verbatim match.
- *
- * @return The image.
- */
- public String getImage() {
- return image;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof VerbatimDirective)) {
- return false;
- }
- VerbatimDirective rhs = (VerbatimDirective)obj;
- if (!image.equals(rhs.image)) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- return image != null ? image.hashCode() : 0;
- }
-
- @Override
- public String toString() {
- return image;
- }
-
- @Override
- public String toDebugString() {
- return "VerbatimDirective(image = '" + image + "')";
- }
-}
-
+package com.yahoo.messagebus.routing;
+
+/**
+ * This class represents a verbatim match within a {@link Hop}'s selector. This is nothing more than a string that will
+ * be used as-is when performing service name lookups.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class VerbatimDirective implements HopDirective {
+
+ private final String image;
+
+ /**
+ * Constructs a new verbatim selector item for a given image.
+ *
+ * @param image The image to assign to this.
+ */
+ public VerbatimDirective(String image) {
+ this.image = image;
+ }
+
+ @Override
+ public boolean matches(HopDirective dir) {
+
+ return dir instanceof VerbatimDirective && image.equals(((VerbatimDirective)dir).image);
+ }
+
+ /**
+ * Returns the image to which this is a verbatim match.
+ *
+ * @return The image.
+ */
+ public String getImage() {
+ return image;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof VerbatimDirective)) {
+ return false;
+ }
+ VerbatimDirective rhs = (VerbatimDirective)obj;
+ if (!image.equals(rhs.image)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return image != null ? image.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return image;
+ }
+
+ @Override
+ public String toDebugString() {
+ return "VerbatimDirective(image = '" + image + "')";
+ }
+}
+
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java
index 60f20b55f8d..2fa167ac571 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java
@@ -1,52 +1,52 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing.test;
-
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingPolicy;
-import com.yahoo.messagebus.test.SimpleProtocol;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class CustomPolicyFactory implements SimpleProtocol.PolicyFactory {
-
- private boolean selectOnRetry;
- private final List<Integer> consumableErrors = new ArrayList<Integer>();
-
- public CustomPolicyFactory() {
- this(true);
- }
-
- public CustomPolicyFactory(boolean selectOnRetry) {
- this(selectOnRetry, new ArrayList<Integer>());
- }
-
- public CustomPolicyFactory(boolean selectOnRetry, int consumableError) {
- this(selectOnRetry, Arrays.asList(consumableError));
- }
-
- public CustomPolicyFactory(boolean selectOnRetry, List<Integer> consumableErrors) {
- this.selectOnRetry = selectOnRetry;
- this.consumableErrors.addAll(consumableErrors);
- }
-
- public RoutingPolicy create(String param) {
- return new CustomPolicy(selectOnRetry, consumableErrors, parseRoutes(param));
- }
-
- public static List<Route> parseRoutes(String routes) {
- List<Route> ret = new ArrayList<Route>();
- if (routes != null && !routes.isEmpty()) {
- for (String route : routes.split(",")) {
- Route r = Route.parse(route);
- assert(route.equals(r.toString()));
- ret.add(r);
- }
- }
- return ret;
- }
-}
+package com.yahoo.messagebus.routing.test;
+
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingPolicy;
+import com.yahoo.messagebus.test.SimpleProtocol;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class CustomPolicyFactory implements SimpleProtocol.PolicyFactory {
+
+ private boolean selectOnRetry;
+ private final List<Integer> consumableErrors = new ArrayList<Integer>();
+
+ public CustomPolicyFactory() {
+ this(true);
+ }
+
+ public CustomPolicyFactory(boolean selectOnRetry) {
+ this(selectOnRetry, new ArrayList<Integer>());
+ }
+
+ public CustomPolicyFactory(boolean selectOnRetry, int consumableError) {
+ this(selectOnRetry, Arrays.asList(consumableError));
+ }
+
+ public CustomPolicyFactory(boolean selectOnRetry, List<Integer> consumableErrors) {
+ this.selectOnRetry = selectOnRetry;
+ this.consumableErrors.addAll(consumableErrors);
+ }
+
+ public RoutingPolicy create(String param) {
+ return new CustomPolicy(selectOnRetry, consumableErrors, parseRoutes(param));
+ }
+
+ public static List<Route> parseRoutes(String routes) {
+ List<Route> ret = new ArrayList<Route>();
+ if (routes != null && !routes.isEmpty()) {
+ for (String route : routes.split(",")) {
+ Route r = Route.parse(route);
+ assert(route.equals(r.toString()));
+ ret.add(r);
+ }
+ }
+ return ret;
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java
index e67fc5030ed..34e48bafc19 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java
@@ -1,92 +1,92 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import com.yahoo.messagebus.routing.RoutingTableSpec;
-import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ErrorTestCase {
-
- @Test
- public void requireThatAccessorsWork() {
- Error err = new Error(69, "foo");
- assertEquals(69, err.getCode());
- assertEquals("foo", err.getMessage());
-
- assertFalse(new Error(ErrorCode.TRANSIENT_ERROR, "foo").isFatal());
- assertFalse(new Error(ErrorCode.TRANSIENT_ERROR + 1, "foo").isFatal());
- assertTrue(new Error(ErrorCode.FATAL_ERROR, "foo").isFatal());
- assertTrue(new Error(ErrorCode.FATAL_ERROR + 1, "foo").isFatal());
- }
-
- @Test
- public void requireThatErrorIsPropagated() throws Exception {
- RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
- table.addHop("itr", "test/itr/session", Arrays.asList("test/itr/session"));
- table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"));
- table.addRoute("test", Arrays.asList("itr", "dst"));
-
- Slobrok slobrok = new Slobrok();
- TestServer src = new TestServer("test/src", table, slobrok, null, null);
- TestServer itr = new TestServer("test/itr", table, slobrok, null, null);
- TestServer dst = new TestServer("test/dst", table, slobrok, null, null);
-
- Receptor ss_rr = new Receptor();
- SourceSession ss = src.mb.createSourceSession(ss_rr);
-
- Receptor is_mr = new Receptor();
- Receptor is_rr = new Receptor();
- IntermediateSession is = itr.mb.createIntermediateSession("session", true, is_mr, is_rr);
-
- Receptor ds_mr = new Receptor();
- DestinationSession ds = dst.mb.createDestinationSession("session", true, ds_mr);
-
- src.waitSlobrok("test/itr/session", 1);
- src.waitSlobrok("test/dst/session", 1);
- itr.waitSlobrok("test/dst/session", 1);
-
- for (int i = 0; i < 5; i++) {
- assertTrue(ss.send(new SimpleMessage("msg"), "test").isAccepted());
- Message msg = is_mr.getMessage(60);
- assertNotNull(msg);
- is.forward(msg);
-
- assertNotNull(msg = ds_mr.getMessage(60));
- Reply reply = new EmptyReply();
- msg.swapState(reply);
- reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
- ds.reply(reply);
-
- assertNotNull(reply = is_rr.getReply(60));
- assertEquals(reply.getNumErrors(), 1);
- assertEquals(reply.getError(0).getService(), "test/dst/session");
- reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
- is.forward(reply);
-
- assertNotNull(reply = ss_rr.getReply(60));
- assertEquals(reply.getNumErrors(), 2);
- assertEquals(reply.getError(0).getService(), "test/dst/session");
- assertEquals(reply.getError(1).getService(), "test/itr/session");
- }
-
- ss.destroy();
- is.destroy();
- ds.destroy();
-
- dst.destroy();
- itr.destroy();
- src.destroy();
- slobrok.stop();
- }
-}
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.RoutingTableSpec;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ErrorTestCase {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ Error err = new Error(69, "foo");
+ assertEquals(69, err.getCode());
+ assertEquals("foo", err.getMessage());
+
+ assertFalse(new Error(ErrorCode.TRANSIENT_ERROR, "foo").isFatal());
+ assertFalse(new Error(ErrorCode.TRANSIENT_ERROR + 1, "foo").isFatal());
+ assertTrue(new Error(ErrorCode.FATAL_ERROR, "foo").isFatal());
+ assertTrue(new Error(ErrorCode.FATAL_ERROR + 1, "foo").isFatal());
+ }
+
+ @Test
+ public void requireThatErrorIsPropagated() throws Exception {
+ RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
+ table.addHop("itr", "test/itr/session", Arrays.asList("test/itr/session"));
+ table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"));
+ table.addRoute("test", Arrays.asList("itr", "dst"));
+
+ Slobrok slobrok = new Slobrok();
+ TestServer src = new TestServer("test/src", table, slobrok, null, null);
+ TestServer itr = new TestServer("test/itr", table, slobrok, null, null);
+ TestServer dst = new TestServer("test/dst", table, slobrok, null, null);
+
+ Receptor ss_rr = new Receptor();
+ SourceSession ss = src.mb.createSourceSession(ss_rr);
+
+ Receptor is_mr = new Receptor();
+ Receptor is_rr = new Receptor();
+ IntermediateSession is = itr.mb.createIntermediateSession("session", true, is_mr, is_rr);
+
+ Receptor ds_mr = new Receptor();
+ DestinationSession ds = dst.mb.createDestinationSession("session", true, ds_mr);
+
+ src.waitSlobrok("test/itr/session", 1);
+ src.waitSlobrok("test/dst/session", 1);
+ itr.waitSlobrok("test/dst/session", 1);
+
+ for (int i = 0; i < 5; i++) {
+ assertTrue(ss.send(new SimpleMessage("msg"), "test").isAccepted());
+ Message msg = is_mr.getMessage(60);
+ assertNotNull(msg);
+ is.forward(msg);
+
+ assertNotNull(msg = ds_mr.getMessage(60));
+ Reply reply = new EmptyReply();
+ msg.swapState(reply);
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
+ ds.reply(reply);
+
+ assertNotNull(reply = is_rr.getReply(60));
+ assertEquals(reply.getNumErrors(), 1);
+ assertEquals(reply.getError(0).getService(), "test/dst/session");
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
+ is.forward(reply);
+
+ assertNotNull(reply = ss_rr.getReply(60));
+ assertEquals(reply.getNumErrors(), 2);
+ assertEquals(reply.getError(0).getService(), "test/dst/session");
+ assertEquals(reply.getError(1).getService(), "test/itr/session");
+ }
+
+ ss.destroy();
+ is.destroy();
+ ds.destroy();
+
+ dst.destroy();
+ itr.destroy();
+ src.destroy();
+ slobrok.stop();
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java
index 6eb7257328b..8026b93e6d9 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java
@@ -1,169 +1,169 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-import com.yahoo.component.Vtag;
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import com.yahoo.messagebus.test.SimpleReply;
-import junit.framework.TestCase;
-
-import java.net.UnknownHostException;
-import java.util.logging.Handler;
-import java.util.logging.LogRecord;
-import java.util.logging.Logger;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SendProxyTestCase extends TestCase {
-
- Slobrok slobrok;
- TestServer srcServer, dstServer;
- SourceSession srcSession;
- DestinationSession dstSession;
-
- @Override
- public void setUp() throws UnknownHostException, ListenFailedException {
- slobrok = new Slobrok();
- dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
- new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
- srcServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
- new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- srcSession = srcServer.mb.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor()));
- assertTrue(srcServer.waitSlobrok("dst/session", 1));
- }
-
- @Override
- public void tearDown() {
- slobrok.stop();
- dstSession.destroy();
- dstServer.destroy();
- srcSession.destroy();
- srcServer.destroy();
- }
-
- public void testTraceByLogLevel() {
- Logger log = Logger.getLogger(SendProxy.class.getName());
- LogHandler logHandler = new LogHandler();
- log.addHandler(logHandler);
-
- log.setLevel(LogLevel.INFO);
- sendMessage(0, null);
- assertNull(logHandler.trace);
-
- log.setLevel(LogLevel.DEBUG);
- sendMessage(0, null);
- assertNull(logHandler.trace);
-
- sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
- assertNull(logHandler.trace);
-
- sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
- assertEquals("Trace for reply with error(s):\n" +
- "<trace>\n" +
- " <trace>\n" +
- " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
- " <trace>\n" +
- " Message (type 1) received at 'dst' for session 'session'.\n" +
- " [FATAL_ERROR @ localhost]: err\n" +
- " Sending reply (version ${VERSION}) from 'dst'.\n" +
- " </trace>\n" +
- " Reply (type 2) received at client.\n" +
- " </trace>\n" +
- "</trace>\n", logHandler.trace);
- logHandler.trace = null;
-
- log.setLevel(LogLevel.SPAM);
- sendMessage(1, null);
- assertNull(logHandler.trace);
-
- sendMessage(0, null);
- assertEquals("Trace for reply:\n" +
- "<trace>\n" +
- " <trace>\n" +
- " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
- " <trace>\n" +
- " Message (type 1) received at 'dst' for session 'session'.\n" +
- " Sending reply (version ${VERSION}) from 'dst'.\n" +
- " </trace>\n" +
- " Reply (type 0) received at client.\n" +
- " </trace>\n" +
- "</trace>\n", logHandler.trace);
- logHandler.trace = null;
-
- sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
- assertNull(logHandler.trace);
-
- sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
- assertEquals("Trace for reply with error(s):\n" +
- "<trace>\n" +
- " <trace>\n" +
- " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
- " <trace>\n" +
- " Message (type 1) received at 'dst' for session 'session'.\n" +
- " [FATAL_ERROR @ localhost]: err\n" +
- " Sending reply (version ${VERSION}) from 'dst'.\n" +
- " </trace>\n" +
- " Reply (type 2) received at client.\n" +
- " </trace>\n" +
- "</trace>\n", logHandler.trace);
- logHandler.trace = null;
- }
-
- private void sendMessage(int traceLevel, Error err) {
- Message msg = new SimpleMessage("foo");
- msg.getTrace().setLevel(traceLevel);
- assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted());
- assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
- if (err != null) {
- Reply reply = new SimpleReply("bar");
- reply.swapState(msg);
- reply.addError(err);
- dstSession.reply(reply);
- } else {
- dstSession.acknowledge(msg);
- }
- Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
- assertNotNull(reply);
- }
-
- private static class LogHandler extends Handler {
-
- String trace = null;
-
- @Override
- public void publish(LogRecord record) {
- String msg = record.getMessage();
- if (msg.startsWith("Trace ")) {
- msg = msg.replaceAll("\\[.*\\] ", "");
- msg = msg.replaceAll("[0-9]+\\.[0-9]+ seconds", "x seconds");
-
- String ver = Vtag.currentVersion.toString();
- for (int i = msg.indexOf(ver); i >= 0; i = msg.indexOf(ver, i)) {
- msg = msg.substring(0, i) + "${VERSION}" + msg.substring(i + ver.length());
- }
- trace = msg;
- }
- }
-
- @Override
- public void flush() {
- // empty
- }
-
- @Override
- public void close() throws SecurityException {
- // empty
- }
- }
-}
+package com.yahoo.messagebus;
+
+import com.yahoo.component.Vtag;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import junit.framework.TestCase;
+
+import java.net.UnknownHostException;
+import java.util.logging.Handler;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SendProxyTestCase extends TestCase {
+
+ Slobrok slobrok;
+ TestServer srcServer, dstServer;
+ SourceSession srcSession;
+ DestinationSession dstSession;
+
+ @Override
+ public void setUp() throws UnknownHostException, ListenFailedException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ srcServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("dst/session", 1));
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ public void testTraceByLogLevel() {
+ Logger log = Logger.getLogger(SendProxy.class.getName());
+ LogHandler logHandler = new LogHandler();
+ log.addHandler(logHandler);
+
+ log.setLevel(LogLevel.INFO);
+ sendMessage(0, null);
+ assertNull(logHandler.trace);
+
+ log.setLevel(LogLevel.DEBUG);
+ sendMessage(0, null);
+ assertNull(logHandler.trace);
+
+ sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertNull(logHandler.trace);
+
+ sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertEquals("Trace for reply with error(s):\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " [FATAL_ERROR @ localhost]: err\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 2) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+
+ log.setLevel(LogLevel.SPAM);
+ sendMessage(1, null);
+ assertNull(logHandler.trace);
+
+ sendMessage(0, null);
+ assertEquals("Trace for reply:\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 0) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+
+ sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertNull(logHandler.trace);
+
+ sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
+ assertEquals("Trace for reply with error(s):\n" +
+ "<trace>\n" +
+ " <trace>\n" +
+ " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
+ " <trace>\n" +
+ " Message (type 1) received at 'dst' for session 'session'.\n" +
+ " [FATAL_ERROR @ localhost]: err\n" +
+ " Sending reply (version ${VERSION}) from 'dst'.\n" +
+ " </trace>\n" +
+ " Reply (type 2) received at client.\n" +
+ " </trace>\n" +
+ "</trace>\n", logHandler.trace);
+ logHandler.trace = null;
+ }
+
+ private void sendMessage(int traceLevel, Error err) {
+ Message msg = new SimpleMessage("foo");
+ msg.getTrace().setLevel(traceLevel);
+ assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted());
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
+ if (err != null) {
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ reply.addError(err);
+ dstSession.reply(reply);
+ } else {
+ dstSession.acknowledge(msg);
+ }
+ Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
+ assertNotNull(reply);
+ }
+
+ private static class LogHandler extends Handler {
+
+ String trace = null;
+
+ @Override
+ public void publish(LogRecord record) {
+ String msg = record.getMessage();
+ if (msg.startsWith("Trace ")) {
+ msg = msg.replaceAll("\\[.*\\] ", "");
+ msg = msg.replaceAll("[0-9]+\\.[0-9]+ seconds", "x seconds");
+
+ String ver = Vtag.currentVersion.toString();
+ for (int i = msg.indexOf(ver); i >= 0; i = msg.indexOf(ver, i)) {
+ msg = msg.substring(0, i) + "${VERSION}" + msg.substring(i + ver.length());
+ }
+ trace = msg;
+ }
+ }
+
+ @Override
+ public void flush() {
+ // empty
+ }
+
+ @Override
+ public void close() throws SecurityException {
+ // empty
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java
index 2795758d922..e0906a6feb4 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java
@@ -1,53 +1,53 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import com.yahoo.messagebus.test.SimpleReply;
-
-import java.net.UnknownHostException;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SimpleTripTestCase extends junit.framework.TestCase {
-
- public void testSimpleTrip() throws ListenFailedException, UnknownHostException {
- Slobrok slobrok = new Slobrok();
- TestServer server = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
- new RPCNetworkParams()
- .setIdentity(new Identity("srv"))
- .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- DestinationSession dst = server.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
- SourceSession src = server.mb.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
- assertTrue(server.waitSlobrok("srv/session", 1));
-
- assertTrue(src.send(new SimpleMessage("msg"), Route.parse("srv/session")).isAccepted());
- Message msg = ((Receptor)dst.getMessageHandler()).getMessage(60);
- assertNotNull(msg);
- assertEquals(SimpleProtocol.NAME, msg.getProtocol());
- assertEquals(SimpleProtocol.MESSAGE, msg.getType());
- assertEquals("msg", ((SimpleMessage)msg).getValue());
-
- Reply reply = new SimpleReply("reply");
- reply.swapState(msg);
- dst.reply(reply);
-
- assertNotNull(reply = ((Receptor)src.getReplyHandler()).getReply(60));
- assertEquals(SimpleProtocol.NAME, reply.getProtocol());
- assertEquals(SimpleProtocol.REPLY, reply.getType());
- assertEquals("reply", ((SimpleReply)reply).getValue());
-
- src.destroy();
- dst.destroy();
- server.destroy();
- }
-}
+package com.yahoo.messagebus;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SimpleTripTestCase extends junit.framework.TestCase {
+
+ public void testSimpleTrip() throws ListenFailedException, UnknownHostException {
+ Slobrok slobrok = new Slobrok();
+ TestServer server = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
+ new RPCNetworkParams()
+ .setIdentity(new Identity("srv"))
+ .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ DestinationSession dst = server.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ SourceSession src = server.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(server.waitSlobrok("srv/session", 1));
+
+ assertTrue(src.send(new SimpleMessage("msg"), Route.parse("srv/session")).isAccepted());
+ Message msg = ((Receptor)dst.getMessageHandler()).getMessage(60);
+ assertNotNull(msg);
+ assertEquals(SimpleProtocol.NAME, msg.getProtocol());
+ assertEquals(SimpleProtocol.MESSAGE, msg.getType());
+ assertEquals("msg", ((SimpleMessage)msg).getValue());
+
+ Reply reply = new SimpleReply("reply");
+ reply.swapState(msg);
+ dst.reply(reply);
+
+ assertNotNull(reply = ((Receptor)src.getReplyHandler()).getReply(60));
+ assertEquals(SimpleProtocol.NAME, reply.getProtocol());
+ assertEquals(SimpleProtocol.REPLY, reply.getType());
+ assertEquals("reply", ((SimpleReply)reply).getValue());
+
+ src.destroy();
+ dst.destroy();
+ server.destroy();
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java
index d296d7c7058..a03066ec745 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java
@@ -1,157 +1,157 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.component.Version;
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import com.yahoo.messagebus.test.SimpleReply;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SendAdapterTestCase {
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Setup
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- Slobrok slobrok;
- TestServer srcServer, itrServer, dstServer;
- SourceSession srcSession;
- IntermediateSession itrSession;
- DestinationSession dstSession;
- TestProtocol srcProtocol, itrProtocol, dstProtocol;
-
- @Before
- public void setUp() throws ListenFailedException, UnknownHostException {
- slobrok = new Slobrok();
- dstServer = new TestServer(
- new MessageBusParams().addProtocol(dstProtocol = new TestProtocol()),
- new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- dstSession = dstServer.mb.createDestinationSession(
- new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
- itrServer = new TestServer(
- new MessageBusParams().addProtocol(itrProtocol = new TestProtocol()),
- new RPCNetworkParams().setIdentity(new Identity("itr")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- itrSession = itrServer.mb.createIntermediateSession(
- new IntermediateSessionParams().setName("session").setMessageHandler(new Receptor()).setReplyHandler(new Receptor()));
- srcServer = new TestServer(
- new MessageBusParams().addProtocol(srcProtocol = new TestProtocol()),
- new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- srcSession = srcServer.mb.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
- assertTrue(srcServer.waitSlobrok("*/session", 2));
- }
-
- @After
- public void tearDown() {
- slobrok.stop();
- dstSession.destroy();
- dstServer.destroy();
- itrSession.destroy();
- itrServer.destroy();
- srcSession.destroy();
- srcServer.destroy();
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Tests
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- @Test
- public void requireThatMessagesCanBeSentAcrossAllSupportedVersions() throws Exception {
- List<Version> versions = Arrays.asList(new Version(5, 0), new Version(5, 1));
- for (Version srcVersion : versions) {
- for (Version itrVersion : versions) {
- for (Version dstVersion : versions) {
- assertVersionedSend(srcVersion, itrVersion, dstVersion);
- }
- }
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Utilities
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- private void assertVersionedSend(Version srcVersion, Version itrVersion, Version dstVersion) {
- System.out.println("Sending from " + srcVersion + " through " + itrVersion + " to " + dstVersion + ":");
- srcServer.net.setVersion(srcVersion);
- itrServer.net.setVersion(itrVersion);
- dstServer.net.setVersion(dstVersion);
-
- Message msg = new SimpleMessage("foo");
- msg.getTrace().setLevel(9);
- assertTrue(srcSession.send(msg, Route.parse("itr/session dst/session")).isAccepted());
- assertNotNull(msg = ((Receptor)itrSession.getMessageHandler()).getMessage(300));
- System.out.println("\tMessage version " + srcProtocol.lastVersion + " serialized at source.");
- Version minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
- assertEquals(minVersion, srcProtocol.lastVersion);
-
- System.out.println("\tMessage version " + itrProtocol.lastVersion + " reached intermediate.");
- assertEquals(minVersion, itrProtocol.lastVersion);
- itrSession.forward(msg);
- assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(300));
- System.out.println("\tMessage version " + itrProtocol.lastVersion + " serialized at intermediate.");
- minVersion = itrVersion.compareTo(dstVersion) < 0 ? itrVersion : dstVersion;
- assertEquals(minVersion, itrProtocol.lastVersion);
-
- System.out.println("\tMessage version " + dstProtocol.lastVersion + " reached destination.");
- assertEquals(minVersion, dstProtocol.lastVersion);
- Reply reply = new SimpleReply("bar");
- reply.swapState(msg);
- dstSession.reply(reply);
- assertNotNull(reply = ((Receptor)itrSession.getReplyHandler()).getReply(300));
- System.out.println("\tReply version " + dstProtocol.lastVersion + " serialized at destination.");
- assertEquals(minVersion, dstProtocol.lastVersion);
-
- System.out.println("\tReply version " + itrProtocol.lastVersion + " reached intermediate.");
- assertEquals(minVersion, itrProtocol.lastVersion);
- itrSession.forward(reply);
- assertNotNull(((Receptor)srcSession.getReplyHandler()).getReply(300));
- System.out.println("\tReply version " + itrProtocol.lastVersion + " serialized at intermediate.");
- minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
- assertEquals(minVersion, itrProtocol.lastVersion);
-
- System.out.println("\tReply version " + srcProtocol.lastVersion + " reached source.");
- assertEquals(minVersion, srcProtocol.lastVersion);
- }
-
- private static class TestProtocol extends SimpleProtocol {
-
- Version lastVersion;
-
- @Override
- public byte[] encode(Version version, Routable routable) {
- lastVersion = version;
- return super.encode(version, routable);
- }
-
- public Routable decode(Version version, byte[] payload) {
- lastVersion = version;
- return super.decode(version, payload);
- }
- }
-}
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.test.Receptor;
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleProtocol;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class SendAdapterTestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Setup
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ Slobrok slobrok;
+ TestServer srcServer, itrServer, dstServer;
+ SourceSession srcSession;
+ IntermediateSession itrSession;
+ DestinationSession dstSession;
+ TestProtocol srcProtocol, itrProtocol, dstProtocol;
+
+ @Before
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ dstServer = new TestServer(
+ new MessageBusParams().addProtocol(dstProtocol = new TestProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ dstSession = dstServer.mb.createDestinationSession(
+ new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
+ itrServer = new TestServer(
+ new MessageBusParams().addProtocol(itrProtocol = new TestProtocol()),
+ new RPCNetworkParams().setIdentity(new Identity("itr")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ itrSession = itrServer.mb.createIntermediateSession(
+ new IntermediateSessionParams().setName("session").setMessageHandler(new Receptor()).setReplyHandler(new Receptor()));
+ srcServer = new TestServer(
+ new MessageBusParams().addProtocol(srcProtocol = new TestProtocol()),
+ new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ srcSession = srcServer.mb.createSourceSession(
+ new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
+ assertTrue(srcServer.waitSlobrok("*/session", 2));
+ }
+
+ @After
+ public void tearDown() {
+ slobrok.stop();
+ dstSession.destroy();
+ dstServer.destroy();
+ itrSession.destroy();
+ itrServer.destroy();
+ srcSession.destroy();
+ srcServer.destroy();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ @Test
+ public void requireThatMessagesCanBeSentAcrossAllSupportedVersions() throws Exception {
+ List<Version> versions = Arrays.asList(new Version(5, 0), new Version(5, 1));
+ for (Version srcVersion : versions) {
+ for (Version itrVersion : versions) {
+ for (Version dstVersion : versions) {
+ assertVersionedSend(srcVersion, itrVersion, dstVersion);
+ }
+ }
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private void assertVersionedSend(Version srcVersion, Version itrVersion, Version dstVersion) {
+ System.out.println("Sending from " + srcVersion + " through " + itrVersion + " to " + dstVersion + ":");
+ srcServer.net.setVersion(srcVersion);
+ itrServer.net.setVersion(itrVersion);
+ dstServer.net.setVersion(dstVersion);
+
+ Message msg = new SimpleMessage("foo");
+ msg.getTrace().setLevel(9);
+ assertTrue(srcSession.send(msg, Route.parse("itr/session dst/session")).isAccepted());
+ assertNotNull(msg = ((Receptor)itrSession.getMessageHandler()).getMessage(300));
+ System.out.println("\tMessage version " + srcProtocol.lastVersion + " serialized at source.");
+ Version minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
+ assertEquals(minVersion, srcProtocol.lastVersion);
+
+ System.out.println("\tMessage version " + itrProtocol.lastVersion + " reached intermediate.");
+ assertEquals(minVersion, itrProtocol.lastVersion);
+ itrSession.forward(msg);
+ assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(300));
+ System.out.println("\tMessage version " + itrProtocol.lastVersion + " serialized at intermediate.");
+ minVersion = itrVersion.compareTo(dstVersion) < 0 ? itrVersion : dstVersion;
+ assertEquals(minVersion, itrProtocol.lastVersion);
+
+ System.out.println("\tMessage version " + dstProtocol.lastVersion + " reached destination.");
+ assertEquals(minVersion, dstProtocol.lastVersion);
+ Reply reply = new SimpleReply("bar");
+ reply.swapState(msg);
+ dstSession.reply(reply);
+ assertNotNull(reply = ((Receptor)itrSession.getReplyHandler()).getReply(300));
+ System.out.println("\tReply version " + dstProtocol.lastVersion + " serialized at destination.");
+ assertEquals(minVersion, dstProtocol.lastVersion);
+
+ System.out.println("\tReply version " + itrProtocol.lastVersion + " reached intermediate.");
+ assertEquals(minVersion, itrProtocol.lastVersion);
+ itrSession.forward(reply);
+ assertNotNull(((Receptor)srcSession.getReplyHandler()).getReply(300));
+ System.out.println("\tReply version " + itrProtocol.lastVersion + " serialized at intermediate.");
+ minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
+ assertEquals(minVersion, itrProtocol.lastVersion);
+
+ System.out.println("\tReply version " + srcProtocol.lastVersion + " reached source.");
+ assertEquals(minVersion, srcProtocol.lastVersion);
+ }
+
+ private static class TestProtocol extends SimpleProtocol {
+
+ Version lastVersion;
+
+ @Override
+ public byte[] encode(Version version, Routable routable) {
+ lastVersion = version;
+ return super.encode(version, routable);
+ }
+
+ public Routable decode(Version version, byte[] payload) {
+ lastVersion = version;
+ return super.decode(version, payload);
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
index e69896e2bcf..dfc5b648eb5 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java
@@ -1,90 +1,90 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.Spec;
-import com.yahoo.jrt.slobrok.api.Mirror;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.network.Identity;
-
-import java.net.UnknownHostException;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ServiceAddressTestCase extends junit.framework.TestCase {
-
- private Slobrok slobrok;
- private RPCNetwork network;
-
- public ServiceAddressTestCase(String msg) {
- super(msg);
- }
-
- public void setUp() throws ListenFailedException, UnknownHostException {
- slobrok = new Slobrok();
- network = new RPCNetwork(new RPCNetworkParams()
- .setIdentity(new Identity("foo"))
- .setSlobrokConfigId("raw:slobrok[1]\nslobrok[0].connectionspec \"" +
- new Spec("localhost", slobrok.port()).toString() + "\"\n"));
- }
-
- public void tearDown() {
- network.shutdown();
- slobrok.stop();
- }
-
- public void testAddrServiceAddress() {
- assertNullAddress("tcp");
- assertNullAddress("tcp/");
- assertNullAddress("tcp/localhost");
- assertNullAddress("tcp/localhost:");
- assertNullAddress("tcp/localhost:1977");
- assertNullAddress("tcp/localhost:1977/");
- assertAddress("tcp/localhost:1977/session", "tcp/localhost:1977", "session");
- assertNullAddress("tcp/localhost:/session");
- //assertNullAddress("tcp/:1977/session");
- assertNullAddress("tcp/:/session");
- }
-
- public void testNameServiceAddress() {
- network.unregisterSession("session");
- assertTrue(waitSlobrok("foo/session", 0));
- assertNullAddress("foo/session");
-
- network.registerSession("session");
- assertTrue(waitSlobrok("foo/session", 1));
- assertAddress("foo/session", network.getConnectionSpec(), "session");
- }
-
- private boolean waitSlobrok(String pattern, int num) {
- for (int i = 0; i < 1000 && !Thread.currentThread().isInterrupted(); ++i) {
- Mirror.Entry[] res = network.getMirror().lookup(pattern);
- if (res.length == num) {
- return true;
- }
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException e) {
- // ignore
- }
- }
- return false;
- }
-
- private void assertNullAddress(String pattern) {
- assertNull(new RPCService(network.getMirror(), pattern).resolve());
- }
-
- private void assertAddress(String pattern, String expectedSpec, String expectedSession) {
- RPCService service = new RPCService(network.getMirror(), pattern);
- RPCServiceAddress obj = service.resolve();
- assertNotNull(obj);
- assertNotNull(obj.getConnectionSpec());
- assertEquals(expectedSpec, obj.getConnectionSpec().toString());
- if (expectedSession != null) {
- assertEquals(expectedSession, obj.getSessionName());
- }
- }
-}
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.Identity;
+
+import java.net.UnknownHostException;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ServiceAddressTestCase extends junit.framework.TestCase {
+
+ private Slobrok slobrok;
+ private RPCNetwork network;
+
+ public ServiceAddressTestCase(String msg) {
+ super(msg);
+ }
+
+ public void setUp() throws ListenFailedException, UnknownHostException {
+ slobrok = new Slobrok();
+ network = new RPCNetwork(new RPCNetworkParams()
+ .setIdentity(new Identity("foo"))
+ .setSlobrokConfigId("raw:slobrok[1]\nslobrok[0].connectionspec \"" +
+ new Spec("localhost", slobrok.port()).toString() + "\"\n"));
+ }
+
+ public void tearDown() {
+ network.shutdown();
+ slobrok.stop();
+ }
+
+ public void testAddrServiceAddress() {
+ assertNullAddress("tcp");
+ assertNullAddress("tcp/");
+ assertNullAddress("tcp/localhost");
+ assertNullAddress("tcp/localhost:");
+ assertNullAddress("tcp/localhost:1977");
+ assertNullAddress("tcp/localhost:1977/");
+ assertAddress("tcp/localhost:1977/session", "tcp/localhost:1977", "session");
+ assertNullAddress("tcp/localhost:/session");
+ //assertNullAddress("tcp/:1977/session");
+ assertNullAddress("tcp/:/session");
+ }
+
+ public void testNameServiceAddress() {
+ network.unregisterSession("session");
+ assertTrue(waitSlobrok("foo/session", 0));
+ assertNullAddress("foo/session");
+
+ network.registerSession("session");
+ assertTrue(waitSlobrok("foo/session", 1));
+ assertAddress("foo/session", network.getConnectionSpec(), "session");
+ }
+
+ private boolean waitSlobrok(String pattern, int num) {
+ for (int i = 0; i < 1000 && !Thread.currentThread().isInterrupted(); ++i) {
+ Mirror.Entry[] res = network.getMirror().lookup(pattern);
+ if (res.length == num) {
+ return true;
+ }
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ private void assertNullAddress(String pattern) {
+ assertNull(new RPCService(network.getMirror(), pattern).resolve());
+ }
+
+ private void assertAddress(String pattern, String expectedSpec, String expectedSession) {
+ RPCService service = new RPCService(network.getMirror(), pattern);
+ RPCServiceAddress obj = service.resolve();
+ assertNotNull(obj);
+ assertNotNull(obj.getConnectionSpec());
+ assertEquals(expectedSpec, obj.getConnectionSpec().toString());
+ if (expectedSession != null) {
+ assertEquals(expectedSession, obj.getSessionName());
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java
index 42580b963a5..a35f98183da 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java
@@ -1,57 +1,57 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import junit.framework.TestCase;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ServicePoolTestCase extends TestCase {
-
- public void testMaxSize() throws ListenFailedException {
- Slobrok slobrok = new Slobrok();
- RPCNetwork net = new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- RPCServicePool pool = new RPCServicePool(net, 2);
-
- pool.resolve("foo");
- assertEquals(1, pool.getSize());
- assertTrue(pool.hasService("foo"));
- assertTrue(!pool.hasService("bar"));
- assertTrue(!pool.hasService("baz"));
-
- pool.resolve("foo");
- assertEquals(1, pool.getSize());
- assertTrue(pool.hasService("foo"));
- assertTrue(!pool.hasService("bar"));
- assertTrue(!pool.hasService("baz"));
-
- pool.resolve("bar");
- assertEquals(2, pool.getSize());
- assertTrue(pool.hasService("foo"));
- assertTrue(pool.hasService("bar"));
- assertTrue(!pool.hasService("baz"));
-
- pool.resolve("baz");
- assertEquals(2, pool.getSize());
- assertTrue(!pool.hasService("foo"));
- assertTrue(pool.hasService("bar"));
- assertTrue(pool.hasService("baz"));
-
- pool.resolve("bar");
- assertEquals(2, pool.getSize());
- assertTrue(!pool.hasService("foo"));
- assertTrue(pool.hasService("bar"));
- assertTrue(pool.hasService("baz"));
-
- pool.resolve("foo");
- assertEquals(2, pool.getSize());
- assertTrue(pool.hasService("foo"));
- assertTrue(pool.hasService("bar"));
- assertTrue(!pool.hasService("baz"));
-
- slobrok.stop();
- }
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+import junit.framework.TestCase;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class ServicePoolTestCase extends TestCase {
+
+ public void testMaxSize() throws ListenFailedException {
+ Slobrok slobrok = new Slobrok();
+ RPCNetwork net = new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
+ RPCServicePool pool = new RPCServicePool(net, 2);
+
+ pool.resolve("foo");
+ assertEquals(1, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(!pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("foo");
+ assertEquals(1, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(!pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("bar");
+ assertEquals(2, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ pool.resolve("baz");
+ assertEquals(2, pool.getSize());
+ assertTrue(!pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(pool.hasService("baz"));
+
+ pool.resolve("bar");
+ assertEquals(2, pool.getSize());
+ assertTrue(!pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(pool.hasService("baz"));
+
+ pool.resolve("foo");
+ assertEquals(2, pool.getSize());
+ assertTrue(pool.hasService("foo"));
+ assertTrue(pool.hasService("bar"));
+ assertTrue(!pool.hasService("baz"));
+
+ slobrok.stop();
+ }
} \ No newline at end of file
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
index 8d58b4dd89f..821080524af 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java
@@ -1,112 +1,112 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.Supervisor;
-import com.yahoo.jrt.Transport;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.concurrent.Timer;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class TargetPoolTestCase extends junit.framework.TestCase {
-
- private Slobrok slobrok;
- private List<TestServer> servers;
- private Supervisor orb;
-
- @Override
- public void setUp() throws ListenFailedException {
- slobrok = new Slobrok();
- servers = new ArrayList<>();
- orb = new Supervisor(new Transport());
- }
-
- @Override
- public void tearDown() {
- slobrok.stop();
- for (TestServer server : servers) {
- server.destroy();
- }
- orb.transport().shutdown().join();
- }
-
- public void testConnectionExpire() throws ListenFailedException, UnknownHostException {
- // Necessary setup to be able to resolve targets.
- RPCServiceAddress adr1 = registerServer();
- RPCServiceAddress adr2 = registerServer();
- RPCServiceAddress adr3 = registerServer();
-
- PoolTimer timer = new PoolTimer();
- RPCTargetPool pool = new RPCTargetPool(timer, 0.666);
-
- // Assert that all connections expire.
- RPCTarget target;
- assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
- assertEquals(3, pool.size());
- for (int i = 0; i < 10; ++i) {
- pool.flushTargets(false);
- assertEquals(3, pool.size());
- }
- timer.millis += 999;
- pool.flushTargets(false);
- assertEquals(0, pool.size());
-
- // Assert that only idle connections expire.
- assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
- assertEquals(3, pool.size());
- timer.millis += 444;
- pool.flushTargets(false);
- assertEquals(3, pool.size());
- assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
- assertEquals(3, pool.size());
- timer.millis += 444;
- pool.flushTargets(false);
- assertEquals(2, pool.size());
- assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
- timer.millis += 444;
- pool.flushTargets(false);
- assertEquals(1, pool.size());
- timer.millis += 444;
- pool.flushTargets(false);
- assertEquals(0, pool.size());
-
- // Assert that connections never expire while they are referenced.
- assertNotNull(target = pool.getTarget(orb, adr1));
- assertEquals(1, pool.size());
- for (int i = 0; i < 10; ++i) {
- timer.millis += 999;
- pool.flushTargets(false);
- assertEquals(1, pool.size());
- }
- target.subRef();
- timer.millis += 999;
- pool.flushTargets(false);
- assertEquals(0, pool.size());
- }
-
- private RPCServiceAddress registerServer() throws ListenFailedException, UnknownHostException {
- servers.add(new TestServer("srv" + servers.size(), null, slobrok, null, null));
- return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec());
- }
-
- private static class PoolTimer implements Timer {
- long millis = 0;
-
- @Override
- public long milliTime() {
- return millis;
- }
- }
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Transport;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.concurrent.Timer;
+import com.yahoo.messagebus.network.rpc.test.TestServer;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class TargetPoolTestCase extends junit.framework.TestCase {
+
+ private Slobrok slobrok;
+ private List<TestServer> servers;
+ private Supervisor orb;
+
+ @Override
+ public void setUp() throws ListenFailedException {
+ slobrok = new Slobrok();
+ servers = new ArrayList<>();
+ orb = new Supervisor(new Transport());
+ }
+
+ @Override
+ public void tearDown() {
+ slobrok.stop();
+ for (TestServer server : servers) {
+ server.destroy();
+ }
+ orb.transport().shutdown().join();
+ }
+
+ public void testConnectionExpire() throws ListenFailedException, UnknownHostException {
+ // Necessary setup to be able to resolve targets.
+ RPCServiceAddress adr1 = registerServer();
+ RPCServiceAddress adr2 = registerServer();
+ RPCServiceAddress adr3 = registerServer();
+
+ PoolTimer timer = new PoolTimer();
+ RPCTargetPool pool = new RPCTargetPool(timer, 0.666);
+
+ // Assert that all connections expire.
+ RPCTarget target;
+ assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ for (int i = 0; i < 10; ++i) {
+ pool.flushTargets(false);
+ assertEquals(3, pool.size());
+ }
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+
+ // Assert that only idle connections expire.
+ assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(3, pool.size());
+ assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ assertEquals(3, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(2, pool.size());
+ assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(1, pool.size());
+ timer.millis += 444;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+
+ // Assert that connections never expire while they are referenced.
+ assertNotNull(target = pool.getTarget(orb, adr1));
+ assertEquals(1, pool.size());
+ for (int i = 0; i < 10; ++i) {
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(1, pool.size());
+ }
+ target.subRef();
+ timer.millis += 999;
+ pool.flushTargets(false);
+ assertEquals(0, pool.size());
+ }
+
+ private RPCServiceAddress registerServer() throws ListenFailedException, UnknownHostException {
+ servers.add(new TestServer("srv" + servers.size(), null, slobrok, null, null));
+ return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec());
+ }
+
+ private static class PoolTimer implements Timer {
+ long millis = 0;
+
+ @Override
+ public long milliTime() {
+ return millis;
+ }
+ }
} \ No newline at end of file
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java
index ad13f3325c7..141681a277a 100644
--- a/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java
@@ -1,32 +1,32 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-import com.yahoo.messagebus.ErrorCode;
-import junit.framework.TestCase;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RetryPolicyTestCase extends TestCase {
-
- public void testSimpleRetryPolicy() {
- RetryTransientErrorsPolicy policy = new RetryTransientErrorsPolicy();
- for (int i = 0; i < 5; ++i) {
- double delay = i / 3.0;
- policy.setBaseDelay(delay);
- for (int j = 0; j < 5; ++j) {
- assertEquals((int)(j * delay), (int)policy.getRetryDelay(j));
- }
- for (int j = ErrorCode.NONE; j < ErrorCode.ERROR_LIMIT; ++j) {
- policy.setEnabled(true);
- if (j < ErrorCode.FATAL_ERROR) {
- assertTrue(policy.canRetry(j));
- } else {
- assertFalse(policy.canRetry(j));
- }
- policy.setEnabled(false);
- assertFalse(policy.canRetry(j));
- }
- }
- }
-}
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.messagebus.ErrorCode;
+import junit.framework.TestCase;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RetryPolicyTestCase extends TestCase {
+
+ public void testSimpleRetryPolicy() {
+ RetryTransientErrorsPolicy policy = new RetryTransientErrorsPolicy();
+ for (int i = 0; i < 5; ++i) {
+ double delay = i / 3.0;
+ policy.setBaseDelay(delay);
+ for (int j = 0; j < 5; ++j) {
+ assertEquals((int)(j * delay), (int)policy.getRetryDelay(j));
+ }
+ for (int j = ErrorCode.NONE; j < ErrorCode.ERROR_LIMIT; ++j) {
+ policy.setEnabled(true);
+ if (j < ErrorCode.FATAL_ERROR) {
+ assertTrue(policy.canRetry(j));
+ } else {
+ assertFalse(policy.canRetry(j));
+ }
+ policy.setEnabled(false);
+ assertFalse(policy.canRetry(j));
+ }
+ }
+ }
+}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java
index 3cbb9d86e44..ada8fcfce29 100755
--- a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java
+++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java
@@ -1,336 +1,336 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.routing;
-
-import com.yahoo.messagebus.ConfigAgent;
-import com.yahoo.messagebus.ConfigHandler;
-import junit.framework.TestCase;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RoutingSpecTestCase extends TestCase {
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Tests
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- public void testConfig() {
- assertConfig(new RoutingSpec());
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")));
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
- .addHop(new HopSpec("myhop1", "myselector1"))));
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
- .addHop(new HopSpec("myhop1", "myselector1"))
- .addRoute(new RouteSpec("myroute1").addHop("myhop1"))));
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
- .addHop(new HopSpec("myhop1", "myselector1"))
- .addHop(new HopSpec("myhop2", "myselector2"))
- .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
- .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
- .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2"))));
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
- .addHop(new HopSpec("myhop1", "myselector1"))
- .addHop(new HopSpec("myhop2", "myselector2"))
- .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
- .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
- .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2")))
- .addTable(new RoutingTableSpec("mytable2")));
- assertEquals("routingtable[2]\n" +
- "routingtable[0].protocol \"mytable1\"\n" +
- "routingtable[1].protocol \"mytable2\"\n" +
- "routingtable[1].hop[3]\n" +
- "routingtable[1].hop[0].name \"myhop1\"\n" +
- "routingtable[1].hop[0].selector \"myselector1\"\n" +
- "routingtable[1].hop[1].name \"myhop2\"\n" +
- "routingtable[1].hop[1].selector \"myselector2\"\n" +
- "routingtable[1].hop[1].ignoreresult true\n" +
- "routingtable[1].hop[2].name \"myhop1\"\n" +
- "routingtable[1].hop[2].selector \"myselector3\"\n" +
- "routingtable[1].hop[2].recipient[2]\n" +
- "routingtable[1].hop[2].recipient[0] \"myrecipient1\"\n" +
- "routingtable[1].hop[2].recipient[1] \"myrecipient2\"\n" +
- "routingtable[1].route[1]\n" +
- "routingtable[1].route[0].name \"myroute1\"\n" +
- "routingtable[1].route[0].hop[1]\n" +
- "routingtable[1].route[0].hop[0] \"myhop1\"\n",
- new RoutingSpec()
- .addTable(new RoutingTableSpec("mytable1"))
- .addTable(new RoutingTableSpec("mytable2")
- .addHop(new HopSpec("myhop1", "myselector1"))
- .addHop(new HopSpec("myhop2", "myselector2").setIgnoreResult(true))
- .addHop(new HopSpec("myhop1", "myselector3")
- .addRecipient("myrecipient1")
- .addRecipient("myrecipient2"))
- .addRoute(new RouteSpec("myroute1").addHop("myhop1"))).toString());
- }
-
- public void testApplicationSpec() {
- assertApplicationSpec(Arrays.asList("foo"),
- Arrays.asList("foo",
- "*"));
- assertApplicationSpec(Arrays.asList("foo/bar"),
- Arrays.asList("foo/bar",
- "foo/*",
- "*/bar",
- "*/*"));
- assertApplicationSpec(Arrays.asList("foo/0/baz",
- "foo/1/baz",
- "foo/2/baz"),
- Arrays.asList("foo/0/baz",
- "foo/1/baz",
- "foo/2/baz",
- "foo/0/*",
- "foo/1/*",
- "foo/2/*",
- "foo/*/baz",
- "*/0/baz",
- "*/1/baz",
- "*/2/baz",
- "foo/*/*",
- "*/0/*",
- "*/1/*",
- "*/2/*",
- "*/*/baz",
- "*/*/*"));
- }
-
- public void testVeriyfOk() {
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("hop1", "myservice1"))),
- new ApplicationSpec().addService("mytable", "myservice1"));
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("route1").addHop("myservice1"))),
- new ApplicationSpec().addService("mytable", "myservice1"));
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("hop1", "myservice1"))
- .addRoute(new RouteSpec("route1").addHop("hop1"))),
- new ApplicationSpec().addService("mytable", "myservice1"));
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("hop1", "route:route2"))
- .addHop(new HopSpec("hop2", "myservice1"))
- .addRoute(new RouteSpec("route1").addHop("hop1"))
- .addRoute(new RouteSpec("route2").addHop("hop2"))),
- new ApplicationSpec().addService("mytable", "myservice1"));
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("myhop1", "foo/[bar]/baz").addRecipient("foo/0/baz").addRecipient("foo/1/baz"))),
- new ApplicationSpec()
- .addService("mytable", "foo/0/baz")
- .addService("mytable", "foo/1/baz"));
- }
-
- public void testVerifyToggle() {
- assertVerifyOk(new RoutingSpec(false)
- .addTable(new RoutingTableSpec("mytable"))
- .addTable(new RoutingTableSpec("mytable")),
- new ApplicationSpec());
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable", false)
- .addHop(new HopSpec("foo", "bar"))
- .addHop(new HopSpec("foo", "baz"))),
- new ApplicationSpec());
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "", false))),
- new ApplicationSpec());
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo", false))),
- new ApplicationSpec());
- }
-
- public void testVerifyFail() {
- // Duplicate table.
- assertVerifyFail(new RoutingSpec()
- .addTable(new RoutingTableSpec("mytable"))
- .addTable(new RoutingTableSpec("mytable")),
- new ApplicationSpec(),
- Arrays.asList("Routing table 'mytable' is defined 2 times."));
-
- // Duplicate hop.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar"))
- .addHop(new HopSpec("foo", "baz"))),
- new ApplicationSpec()
- .addService("mytable", "bar")
- .addService("mytable", "baz"),
- Arrays.asList("Hop 'foo' in routing table 'mytable' is defined 2 times."));
-
- // Duplicate route.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo").addHop("bar"))
- .addRoute(new RouteSpec("foo").addHop("baz"))),
- new ApplicationSpec()
- .addService("mytable", "bar")
- .addService("mytable", "baz"),
- Arrays.asList("Route 'foo' in routing table 'mytable' is defined 2 times."));
-
- // Empty hop.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", ""))),
- new ApplicationSpec(),
- Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to parse empty string."));
-
- // Empty route.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo"))),
- new ApplicationSpec(),
- Arrays.asList("Route 'foo' in routing table 'mytable' has no hops."));
-
- // Hop error.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar/baz cox"))),
- new ApplicationSpec(),
- Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
-
- // Hop error in recipient.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "[bar]").addRecipient("bar/baz cox"))),
- new ApplicationSpec(),
- Arrays.asList("For recipient 'bar/baz cox' in hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
-
- // Hop error in route.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo").addHop("bar/baz cox"))),
- new ApplicationSpec(),
- Arrays.asList("For hop 1 in route 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
-
- // Hop not found.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo").addHop("bar"))),
- new ApplicationSpec(),
- Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references 'bar' which is neither a service, a route nor another hop."));
-
- // Mismatched recipient.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar/[baz]/cox").addRecipient("cox/0/bar"))),
- new ApplicationSpec(),
- Arrays.asList("Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'foo' in routing table 'mytable'."));
-
- // Route not found.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "route:bar"))),
- new ApplicationSpec(),
- Arrays.asList("Hop 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
-
- // Route not found in route.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo").addHop("route:bar"))),
- new ApplicationSpec(),
- Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
-
- // Service not found.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar/baz"))),
- new ApplicationSpec(),
- Arrays.asList("Hop 'foo' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop."));
-
- // Unexpected recipient.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar").addRecipient("baz"))),
- new ApplicationSpec()
- .addService("mytable", "bar")
- .addService("mytable", "baz"),
- Arrays.asList("Hop 'foo' in routing table 'mytable' has recipients but no policy directive."));
-
- // Multiple errors.
- assertVerifyFail(new RoutingSpec()
- .addTable(new RoutingTableSpec("mytable"))
- .addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("hop1", "bar"))
- .addHop(new HopSpec("hop1", "baz"))
- .addHop(new HopSpec("hop2", ""))
- .addHop(new HopSpec("hop3", "bar/baz cox"))
- .addHop(new HopSpec("hop4", "[bar]").addRecipient("bar/baz cox"))
- .addHop(new HopSpec("hop5", "bar/[baz]/cox").addRecipient("cox/0/bar"))
- .addHop(new HopSpec("hop6", "route:route69"))
- .addHop(new HopSpec("hop7", "bar/baz"))
- .addHop(new HopSpec("hop8", "bar").addRecipient("baz"))
- .addRoute(new RouteSpec("route1").addHop("bar"))
- .addRoute(new RouteSpec("route1").addHop("baz"))
- .addRoute(new RouteSpec("route2").addHop(""))
- .addRoute(new RouteSpec("route3").addHop("bar/baz cox"))
- .addRoute(new RouteSpec("route4").addHop("hop69"))
- .addRoute(new RouteSpec("route5").addHop("route:route69"))),
- new ApplicationSpec()
- .addService("mytable", "bar")
- .addService("mytable", "baz"),
- Arrays.asList("Routing table 'mytable' is defined 2 times.",
- "For hop 'hop2' in routing table 'mytable'; Failed to parse empty string.",
- "For hop 'hop3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
- "For hop 1 in route 'route2' in routing table 'mytable'; Failed to parse empty string.",
- "For hop 1 in route 'route3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
- "For recipient 'bar/baz cox' in hop 'hop4' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
- "Hop 'hop1' in routing table 'mytable' is defined 2 times.",
- "Hop 'hop6' in routing table 'mytable' references route 'route69' which does not exist.",
- "Hop 'hop7' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop.",
- "Hop 'hop8' in routing table 'mytable' has recipients but no policy directive.",
- "Hop 1 in route 'route4' in routing table 'mytable' references 'hop69' which is neither a service, a route nor another hop.",
- "Hop 1 in route 'route5' in routing table 'mytable' references route 'route69' which does not exist.",
- "Route 'route1' in routing table 'mytable' is defined 2 times.",
- "Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'hop5' in routing table 'mytable'."));
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Utilities
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- private static void assertVerifyOk(RoutingSpec routing, ApplicationSpec app) {
- assertVerifyFail(routing, app, new ArrayList<String>());
- }
-
- private static void assertVerifyFail(RoutingSpec routing, ApplicationSpec app, List<String> expectedErrors) {
- List<String> errors = new ArrayList<>();
- routing.verify(app, errors);
-
- Collections.sort(errors);
- Collections.sort(expectedErrors);
- assertEquals(expectedErrors.toString(), errors.toString());
- }
-
- private static void assertConfig(RoutingSpec routing) {
- assertEquals(routing, routing);
- assertEquals(routing, new RoutingSpec(routing));
-
- ConfigStore store = new ConfigStore();
- ConfigAgent subscriber = new ConfigAgent("raw:" + routing.toString(), store);
- subscriber.subscribe();
- assertTrue(store.routing.equals(routing));
- }
-
- private static void assertApplicationSpec(List<String> services, List<String> patterns) {
- ApplicationSpec app = new ApplicationSpec();
- for (String pattern : patterns) {
- assertFalse(app.isService("foo", pattern));
- assertFalse(app.isService("bar", pattern));
- }
- for (String service : services) {
- app.addService("foo", service);
- }
- for (String pattern : patterns) {
- assertTrue(app.isService("foo", pattern));
- assertFalse(app.isService("bar", pattern));
- }
- for (String service : services) {
- app.addService("bar", service);
- }
- for (String pattern : patterns) {
- assertTrue(app.isService("foo", pattern));
- assertTrue(app.isService("bar", pattern));
- }
- }
-
- private static class ConfigStore implements ConfigHandler {
-
- RoutingSpec routing = null;
-
- public void setupRouting(RoutingSpec routing) {
- this.routing = routing;
- }
- }
-}
+package com.yahoo.messagebus.routing;
+
+import com.yahoo.messagebus.ConfigAgent;
+import com.yahoo.messagebus.ConfigHandler;
+import junit.framework.TestCase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class RoutingSpecTestCase extends TestCase {
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Tests
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ public void testConfig() {
+ assertConfig(new RoutingSpec());
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
+ .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
+ .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2"))));
+ assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
+ .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
+ .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2")))
+ .addTable(new RoutingTableSpec("mytable2")));
+ assertEquals("routingtable[2]\n" +
+ "routingtable[0].protocol \"mytable1\"\n" +
+ "routingtable[1].protocol \"mytable2\"\n" +
+ "routingtable[1].hop[3]\n" +
+ "routingtable[1].hop[0].name \"myhop1\"\n" +
+ "routingtable[1].hop[0].selector \"myselector1\"\n" +
+ "routingtable[1].hop[1].name \"myhop2\"\n" +
+ "routingtable[1].hop[1].selector \"myselector2\"\n" +
+ "routingtable[1].hop[1].ignoreresult true\n" +
+ "routingtable[1].hop[2].name \"myhop1\"\n" +
+ "routingtable[1].hop[2].selector \"myselector3\"\n" +
+ "routingtable[1].hop[2].recipient[2]\n" +
+ "routingtable[1].hop[2].recipient[0] \"myrecipient1\"\n" +
+ "routingtable[1].hop[2].recipient[1] \"myrecipient2\"\n" +
+ "routingtable[1].route[1]\n" +
+ "routingtable[1].route[0].name \"myroute1\"\n" +
+ "routingtable[1].route[0].hop[1]\n" +
+ "routingtable[1].route[0].hop[0] \"myhop1\"\n",
+ new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable1"))
+ .addTable(new RoutingTableSpec("mytable2")
+ .addHop(new HopSpec("myhop1", "myselector1"))
+ .addHop(new HopSpec("myhop2", "myselector2").setIgnoreResult(true))
+ .addHop(new HopSpec("myhop1", "myselector3")
+ .addRecipient("myrecipient1")
+ .addRecipient("myrecipient2"))
+ .addRoute(new RouteSpec("myroute1").addHop("myhop1"))).toString());
+ }
+
+ public void testApplicationSpec() {
+ assertApplicationSpec(Arrays.asList("foo"),
+ Arrays.asList("foo",
+ "*"));
+ assertApplicationSpec(Arrays.asList("foo/bar"),
+ Arrays.asList("foo/bar",
+ "foo/*",
+ "*/bar",
+ "*/*"));
+ assertApplicationSpec(Arrays.asList("foo/0/baz",
+ "foo/1/baz",
+ "foo/2/baz"),
+ Arrays.asList("foo/0/baz",
+ "foo/1/baz",
+ "foo/2/baz",
+ "foo/0/*",
+ "foo/1/*",
+ "foo/2/*",
+ "foo/*/baz",
+ "*/0/baz",
+ "*/1/baz",
+ "*/2/baz",
+ "foo/*/*",
+ "*/0/*",
+ "*/1/*",
+ "*/2/*",
+ "*/*/baz",
+ "*/*/*"));
+ }
+
+ public void testVeriyfOk() {
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "myservice1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("route1").addHop("myservice1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "myservice1"))
+ .addRoute(new RouteSpec("route1").addHop("hop1"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "route:route2"))
+ .addHop(new HopSpec("hop2", "myservice1"))
+ .addRoute(new RouteSpec("route1").addHop("hop1"))
+ .addRoute(new RouteSpec("route2").addHop("hop2"))),
+ new ApplicationSpec().addService("mytable", "myservice1"));
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("myhop1", "foo/[bar]/baz").addRecipient("foo/0/baz").addRecipient("foo/1/baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "foo/0/baz")
+ .addService("mytable", "foo/1/baz"));
+ }
+
+ public void testVerifyToggle() {
+ assertVerifyOk(new RoutingSpec(false)
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable", false)
+ .addHop(new HopSpec("foo", "bar"))
+ .addHop(new HopSpec("foo", "baz"))),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "", false))),
+ new ApplicationSpec());
+ assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo", false))),
+ new ApplicationSpec());
+ }
+
+ public void testVerifyFail() {
+ // Duplicate table.
+ assertVerifyFail(new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")),
+ new ApplicationSpec(),
+ Arrays.asList("Routing table 'mytable' is defined 2 times."));
+
+ // Duplicate hop.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar"))
+ .addHop(new HopSpec("foo", "baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' is defined 2 times."));
+
+ // Duplicate route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar"))
+ .addRoute(new RouteSpec("foo").addHop("baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Route 'foo' in routing table 'mytable' is defined 2 times."));
+
+ // Empty hop.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", ""))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to parse empty string."));
+
+ // Empty route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo"))),
+ new ApplicationSpec(),
+ Arrays.asList("Route 'foo' in routing table 'mytable' has no hops."));
+
+ // Hop error.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop error in recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "[bar]").addRecipient("bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For recipient 'bar/baz cox' in hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop error in route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar/baz cox"))),
+ new ApplicationSpec(),
+ Arrays.asList("For hop 1 in route 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
+
+ // Hop not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references 'bar' which is neither a service, a route nor another hop."));
+
+ // Mismatched recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/[baz]/cox").addRecipient("cox/0/bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'foo' in routing table 'mytable'."));
+
+ // Route not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "route:bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
+
+ // Route not found in route.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addRoute(new RouteSpec("foo").addHop("route:bar"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
+
+ // Service not found.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar/baz"))),
+ new ApplicationSpec(),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop."));
+
+ // Unexpected recipient.
+ assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("foo", "bar").addRecipient("baz"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Hop 'foo' in routing table 'mytable' has recipients but no policy directive."));
+
+ // Multiple errors.
+ assertVerifyFail(new RoutingSpec()
+ .addTable(new RoutingTableSpec("mytable"))
+ .addTable(new RoutingTableSpec("mytable")
+ .addHop(new HopSpec("hop1", "bar"))
+ .addHop(new HopSpec("hop1", "baz"))
+ .addHop(new HopSpec("hop2", ""))
+ .addHop(new HopSpec("hop3", "bar/baz cox"))
+ .addHop(new HopSpec("hop4", "[bar]").addRecipient("bar/baz cox"))
+ .addHop(new HopSpec("hop5", "bar/[baz]/cox").addRecipient("cox/0/bar"))
+ .addHop(new HopSpec("hop6", "route:route69"))
+ .addHop(new HopSpec("hop7", "bar/baz"))
+ .addHop(new HopSpec("hop8", "bar").addRecipient("baz"))
+ .addRoute(new RouteSpec("route1").addHop("bar"))
+ .addRoute(new RouteSpec("route1").addHop("baz"))
+ .addRoute(new RouteSpec("route2").addHop(""))
+ .addRoute(new RouteSpec("route3").addHop("bar/baz cox"))
+ .addRoute(new RouteSpec("route4").addHop("hop69"))
+ .addRoute(new RouteSpec("route5").addHop("route:route69"))),
+ new ApplicationSpec()
+ .addService("mytable", "bar")
+ .addService("mytable", "baz"),
+ Arrays.asList("Routing table 'mytable' is defined 2 times.",
+ "For hop 'hop2' in routing table 'mytable'; Failed to parse empty string.",
+ "For hop 'hop3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "For hop 1 in route 'route2' in routing table 'mytable'; Failed to parse empty string.",
+ "For hop 1 in route 'route3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "For recipient 'bar/baz cox' in hop 'hop4' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
+ "Hop 'hop1' in routing table 'mytable' is defined 2 times.",
+ "Hop 'hop6' in routing table 'mytable' references route 'route69' which does not exist.",
+ "Hop 'hop7' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop.",
+ "Hop 'hop8' in routing table 'mytable' has recipients but no policy directive.",
+ "Hop 1 in route 'route4' in routing table 'mytable' references 'hop69' which is neither a service, a route nor another hop.",
+ "Hop 1 in route 'route5' in routing table 'mytable' references route 'route69' which does not exist.",
+ "Route 'route1' in routing table 'mytable' is defined 2 times.",
+ "Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'hop5' in routing table 'mytable'."));
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////
+ //
+ // Utilities
+ //
+ ////////////////////////////////////////////////////////////////////////////////
+
+ private static void assertVerifyOk(RoutingSpec routing, ApplicationSpec app) {
+ assertVerifyFail(routing, app, new ArrayList<String>());
+ }
+
+ private static void assertVerifyFail(RoutingSpec routing, ApplicationSpec app, List<String> expectedErrors) {
+ List<String> errors = new ArrayList<>();
+ routing.verify(app, errors);
+
+ Collections.sort(errors);
+ Collections.sort(expectedErrors);
+ assertEquals(expectedErrors.toString(), errors.toString());
+ }
+
+ private static void assertConfig(RoutingSpec routing) {
+ assertEquals(routing, routing);
+ assertEquals(routing, new RoutingSpec(routing));
+
+ ConfigStore store = new ConfigStore();
+ ConfigAgent subscriber = new ConfigAgent("raw:" + routing.toString(), store);
+ subscriber.subscribe();
+ assertTrue(store.routing.equals(routing));
+ }
+
+ private static void assertApplicationSpec(List<String> services, List<String> patterns) {
+ ApplicationSpec app = new ApplicationSpec();
+ for (String pattern : patterns) {
+ assertFalse(app.isService("foo", pattern));
+ assertFalse(app.isService("bar", pattern));
+ }
+ for (String service : services) {
+ app.addService("foo", service);
+ }
+ for (String pattern : patterns) {
+ assertTrue(app.isService("foo", pattern));
+ assertFalse(app.isService("bar", pattern));
+ }
+ for (String service : services) {
+ app.addService("bar", service);
+ }
+ for (String pattern : patterns) {
+ assertTrue(app.isService("foo", pattern));
+ assertTrue(app.isService("bar", pattern));
+ }
+ }
+
+ private static class ConfigStore implements ConfigHandler {
+
+ RoutingSpec routing = null;
+
+ public void setupRouting(RoutingSpec routing) {
+ this.routing = routing;
+ }
+ }
+}