diff options
author | Arne H Juul <arnej@yahoo-inc.com> | 2017-01-03 14:41:04 +0100 |
---|---|---|
committer | Arne H Juul <arnej@yahoo-inc.com> | 2017-01-03 14:41:04 +0100 |
commit | c73ae73b3c7bc8ebe7eb17fe12afdef1976563c7 (patch) | |
tree | 480337b08ec6c62bbcf2aea8132b1d3cdc4fd7e8 | |
parent | 8b423e4c115d647307349e8be621fe189f27cb2c (diff) |
whitespace fixups.
29 files changed, 2400 insertions, 2400 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 9d6af3a84a9..57ff0028585 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -1,256 +1,256 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.concurrent.Timer;
-import com.yahoo.log.LogLevel;
-import java.util.logging.Logger;
-
-/**
- * This is an implementatin of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a
- * {@link SourceSession} is allowed to have.
- *
- * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class DynamicThrottlePolicy extends StaticThrottlePolicy {
-
- private static final long IDLE_TIME_MILLIS = 60000;
- private final Timer timer;
- private int numSent = 0;
- private int numOk = 0;
- private double resizeRate = 3;
- private long resizeTime = 0;
- private long timeOfLastMessage;
- private double efficiencyThreshold = 1.0;
- private double windowSizeIncrement = 20;
- private double windowSize = windowSizeIncrement;
- private double minWindowSize = windowSizeIncrement;
- private double maxWindowSize = Integer.MAX_VALUE;
- private double windowSizeBackOff = 0.9;
- private double weight = 1.0;
- private double localMaxThroughput = 0;
- private double maxThroughput = 0;
- private static final Logger log = Logger.getLogger(DynamicThrottlePolicy.class.getName());
-
- /**
- * Constructs a new instance of this policy and sets the appropriate default values of member data.
- */
- public DynamicThrottlePolicy() {
- this(SystemTimer.INSTANCE);
- }
-
- /**
- * Constructs a new instance of this class using the given clock to calculate efficiency.
- *
- * @param timer The timer to use.
- */
- public DynamicThrottlePolicy(Timer timer) {
- this.timer = timer;
- this.timeOfLastMessage = timer.milliTime();
- }
-
- public double getWindowSizeIncrement() {
- return windowSizeIncrement;
- }
-
- public double getWindowSizeBackOff() {
- return windowSizeBackOff;
- }
-
- public void setMaxThroughput(double maxThroughput) {
- this.maxThroughput = maxThroughput;
- }
-
- @Override
- public boolean canSend(Message msg, int pendingCount) {
- if (!super.canSend(msg, pendingCount)) {
- return false;
- }
- long time = timer.milliTime();
- double elapsed = (time - timeOfLastMessage);
- if (elapsed > IDLE_TIME_MILLIS) {
- windowSize = Math.min(windowSize, pendingCount + windowSizeIncrement);
- }
- timeOfLastMessage = time;
- return pendingCount < windowSize;
- }
-
- @Override
- public void processMessage(Message msg) {
- super.processMessage(msg);
- if (++numSent < windowSize * resizeRate) {
- return;
- }
-
- long time = timer.milliTime();
- double elapsed = time - resizeTime;
- resizeTime = time;
-
- double throughput = numOk / elapsed;
- numSent = 0;
- numOk = 0;
-
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput);
- }
-
- if (maxThroughput > 0 && throughput > maxThroughput * 0.95) {
- // No need to increase window when we're this close to max.
- } else if (throughput > localMaxThroughput * 1.01) {
- localMaxThroughput = throughput;
- windowSize += weight*windowSizeIncrement;
- } else {
- // scale up/down throughput for comparing to window size
- double period = 1;
- while(throughput * period/windowSize < 2) {
- period *= 10;
- }
- while(throughput * period/windowSize > 2) {
- period *= 0.1;
- }
- double efficiency = throughput*period/windowSize;
- if (efficiency < efficiencyThreshold) {
- double newSize = Math.min(windowSize,throughput * period);
- windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - 2* windowSizeIncrement);
- localMaxThroughput = 0;
- } else {
- windowSize += weight*windowSizeIncrement;
- }
- }
- windowSize = Math.max(minWindowSize, windowSize);
- windowSize = Math.min(maxWindowSize, windowSize);
- }
-
- @Override
- public void processReply(Reply reply) {
- super.processReply(reply);
- if (!reply.hasErrors()) {
- ++numOk;
- }
- }
-
- /**
- * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is
- * the correlation between throughput and window size. The algorithm will increase the window size until efficiency
- * drops below the efficiency of the local maxima times this value.
- *
- * @param efficiencyThreshold The limit to set.
- * @return This, to allow chaining.
- * @see #setWindowSizeBackOff(double)
- */
- public DynamicThrottlePolicy setEfficiencyThreshold(double efficiencyThreshold) {
- this.efficiencyThreshold = efficiencyThreshold;
- return this;
- }
-
- /**
- * Sets the step size used when increasing window size.
- *
- * @param windowSizeIncrement The step size to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setWindowSizeIncrement(double windowSizeIncrement) {
- this.windowSizeIncrement = windowSizeIncrement;
- return this;
- }
-
- /**
- * Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing.
- * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to
- * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range.
- *
- * @param windowSizeBackOff The back off to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setWindowSizeBackOff(double windowSizeBackOff) {
- this.windowSizeBackOff = Math.max(0, Math.min(1, windowSizeBackOff));
- return this;
- }
-
- /**
- * Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing
- * becomes. However, the smaller the value, the less accurate the measurements become.
- *
- * @param resizeRate The rate to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setResizeRate(double resizeRate) {
- this.resizeRate = resizeRate;
- return this;
- }
-
- /**
- * Sets the weight for this client. The larger the value, the more resources
- * will be allocated to this clients. Resources are shared between clients
- * proportiannally to their weights.
- *
- * @param weight The weight to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setWeight(double weight) {
- this.weight = weight;
- return this;
- }
-
- /**
- * Sets the maximium number of pending operations allowed at any time, in
- * order to avoid using too much resources.
- *
- * @param max The max to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setMaxWindowSize(double max) {
- this.maxWindowSize = max;
- return this;
- }
-
- /**
- * Get the maximum number of pending operations allowed at any time.
- *
- * @return The maximum number of operations.
- */
- public double getMaxWindowSize() {
- return maxWindowSize;
- }
-
-
- /**
- * Sets the minimium number of pending operations allowed at any time, in
- * order to keep a level of performance.
- *
- * @param min The min to set.
- * @return This, to allow chaining.
- */
- public DynamicThrottlePolicy setMinWindowSize(double min) {
- this.minWindowSize = min;
- return this;
- }
-
- /**
- * Get the minimum number of pending operations allowed at any time.
- *
- * @return The minimum number of operations.
- */
- public double getMinWindowSize() {
- return minWindowSize;
- }
-
- public DynamicThrottlePolicy setMaxPendingCount(int maxCount) {
- super.setMaxPendingCount(maxCount);
- maxWindowSize = maxCount;
- return this;
- }
-
-
- /**
- * Returns the maximum number of pending messages allowed.
- *
- * @return The max limit.
- */
- public int getMaxPendingCount() {
- return (int)windowSize;
- }
-
-}
+package com.yahoo.messagebus; + +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.concurrent.Timer; +import com.yahoo.log.LogLevel; +import java.util.logging.Logger; + +/** + * This is an implementatin of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a + * {@link SourceSession} is allowed to have. + * + * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class DynamicThrottlePolicy extends StaticThrottlePolicy { + + private static final long IDLE_TIME_MILLIS = 60000; + private final Timer timer; + private int numSent = 0; + private int numOk = 0; + private double resizeRate = 3; + private long resizeTime = 0; + private long timeOfLastMessage; + private double efficiencyThreshold = 1.0; + private double windowSizeIncrement = 20; + private double windowSize = windowSizeIncrement; + private double minWindowSize = windowSizeIncrement; + private double maxWindowSize = Integer.MAX_VALUE; + private double windowSizeBackOff = 0.9; + private double weight = 1.0; + private double localMaxThroughput = 0; + private double maxThroughput = 0; + private static final Logger log = Logger.getLogger(DynamicThrottlePolicy.class.getName()); + + /** + * Constructs a new instance of this policy and sets the appropriate default values of member data. + */ + public DynamicThrottlePolicy() { + this(SystemTimer.INSTANCE); + } + + /** + * Constructs a new instance of this class using the given clock to calculate efficiency. + * + * @param timer The timer to use. + */ + public DynamicThrottlePolicy(Timer timer) { + this.timer = timer; + this.timeOfLastMessage = timer.milliTime(); + } + + public double getWindowSizeIncrement() { + return windowSizeIncrement; + } + + public double getWindowSizeBackOff() { + return windowSizeBackOff; + } + + public void setMaxThroughput(double maxThroughput) { + this.maxThroughput = maxThroughput; + } + + @Override + public boolean canSend(Message msg, int pendingCount) { + if (!super.canSend(msg, pendingCount)) { + return false; + } + long time = timer.milliTime(); + double elapsed = (time - timeOfLastMessage); + if (elapsed > IDLE_TIME_MILLIS) { + windowSize = Math.min(windowSize, pendingCount + windowSizeIncrement); + } + timeOfLastMessage = time; + return pendingCount < windowSize; + } + + @Override + public void processMessage(Message msg) { + super.processMessage(msg); + if (++numSent < windowSize * resizeRate) { + return; + } + + long time = timer.milliTime(); + double elapsed = time - resizeTime; + resizeTime = time; + + double throughput = numOk / elapsed; + numSent = 0; + numOk = 0; + + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput); + } + + if (maxThroughput > 0 && throughput > maxThroughput * 0.95) { + // No need to increase window when we're this close to max. + } else if (throughput > localMaxThroughput * 1.01) { + localMaxThroughput = throughput; + windowSize += weight*windowSizeIncrement; + } else { + // scale up/down throughput for comparing to window size + double period = 1; + while(throughput * period/windowSize < 2) { + period *= 10; + } + while(throughput * period/windowSize > 2) { + period *= 0.1; + } + double efficiency = throughput*period/windowSize; + if (efficiency < efficiencyThreshold) { + double newSize = Math.min(windowSize,throughput * period); + windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - 2* windowSizeIncrement); + localMaxThroughput = 0; + } else { + windowSize += weight*windowSizeIncrement; + } + } + windowSize = Math.max(minWindowSize, windowSize); + windowSize = Math.min(maxWindowSize, windowSize); + } + + @Override + public void processReply(Reply reply) { + super.processReply(reply); + if (!reply.hasErrors()) { + ++numOk; + } + } + + /** + * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is + * the correlation between throughput and window size. The algorithm will increase the window size until efficiency + * drops below the efficiency of the local maxima times this value. + * + * @param efficiencyThreshold The limit to set. + * @return This, to allow chaining. + * @see #setWindowSizeBackOff(double) + */ + public DynamicThrottlePolicy setEfficiencyThreshold(double efficiencyThreshold) { + this.efficiencyThreshold = efficiencyThreshold; + return this; + } + + /** + * Sets the step size used when increasing window size. + * + * @param windowSizeIncrement The step size to set. + * @return This, to allow chaining. + */ + public DynamicThrottlePolicy setWindowSizeIncrement(double windowSizeIncrement) { + this.windowSizeIncrement = windowSizeIncrement; + return this; + } + + /** + * Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing. + * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to + * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range. + * + * @param windowSizeBackOff The back off to set. + * @return This, to allow chaining. + */ + public DynamicThrottlePolicy setWindowSizeBackOff(double windowSizeBackOff) { + this.windowSizeBackOff = Math.max(0, Math.min(1, windowSizeBackOff)); + return this; + } + + /** + * Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing + * becomes. However, the smaller the value, the less accurate the measurements become. + * + * @param resizeRate The rate to set. + * @return This, to allow chaining. + */ + public DynamicThrottlePolicy setResizeRate(double resizeRate) { + this.resizeRate = resizeRate; + return this; + } + + /** + * Sets the weight for this client. The larger the value, the more resources + * will be allocated to this clients. Resources are shared between clients + * proportiannally to their weights. + * + * @param weight The weight to set. + * @return This, to allow chaining. + */ + public DynamicThrottlePolicy setWeight(double weight) { + this.weight = weight; + return this; + } + + /** + * Sets the maximium number of pending operations allowed at any time, in + * order to avoid using too much resources. + * + * @param max The max to set. + * @return This, to allow chaining. + */ + public DynamicThrottlePolicy setMaxWindowSize(double max) { + this.maxWindowSize = max; + return this; + } + + /** + * Get the maximum number of pending operations allowed at any time. + * + * @return The maximum number of operations. + */ + public double getMaxWindowSize() { + return maxWindowSize; + } + + + /** + * Sets the minimium number of pending operations allowed at any time, in + * order to keep a level of performance. + * + * @param min The min to set. + * @return This, to allow chaining. + */ + public DynamicThrottlePolicy setMinWindowSize(double min) { + this.minWindowSize = min; + return this; + } + + /** + * Get the minimum number of pending operations allowed at any time. + * + * @return The minimum number of operations. + */ + public double getMinWindowSize() { + return minWindowSize; + } + + public DynamicThrottlePolicy setMaxPendingCount(int maxCount) { + super.setMaxPendingCount(maxCount); + maxWindowSize = maxCount; + return this; + } + + + /** + * Returns the maximum number of pending messages allowed. + * + * @return The max limit. + */ + public int getMaxPendingCount() { + return (int)windowSize; + } + +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index cf5beb4a903..91d3ba966c3 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -35,7 +35,7 @@ import java.util.logging.Logger; * messaging semantics of each hop.</p> * * The responsibilities of a message bus are: - * <ul> + * <ul> * <li>Assign a route to every send message from its routing table * <li>Deliver every message it <i>accepts</i> to the next hop on its route * <i>or</i> deliver a <i>failure reply</i>. diff --git a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java index 24e177f1fbf..310bbf2c3a6 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/NetworkMessageBus.java @@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * The combination of a messagebus and a network over which it may send data. - * + * * @author bratseth */ public class NetworkMessageBus { @@ -16,12 +16,12 @@ public class NetworkMessageBus { private final MessageBus messageBus; private final AtomicBoolean destroyed = new AtomicBoolean(false); - + public NetworkMessageBus(Network network, MessageBus messageBus) { this.network = network; this.messageBus = messageBus; } - + /** Returns the contained message bus object */ public MessageBus getMessageBus() { return messageBus; } @@ -30,14 +30,14 @@ public class NetworkMessageBus { /** * Irreversibly destroys the content of this. - * + * * @return whether this destroyed anything, or if it was already destroyed */ public boolean destroy() { if ( destroyed.getAndSet(true)) return false; - + getMessageBus().destroy(); return true; } - + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java b/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java index 949a8486fb7..503119d9350 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/ProtocolRepository.java @@ -1,109 +1,109 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-import com.yahoo.concurrent.CopyOnWriteHashMap;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.routing.RoutingPolicy;
-import com.yahoo.text.Utf8String;
-
-import java.util.logging.Logger;
-
-/**
- * Implements a thread-safe repository for protocols and their routing policies. This manages an internal cache of
- * routing policies so that similarly referenced policy directives share the same instance of a policy.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ProtocolRepository {
-
- private static final Logger log = Logger.getLogger(ProtocolRepository.class.getName());
- private final CopyOnWriteHashMap<String, Protocol> protocols = new CopyOnWriteHashMap<>();
- private final CopyOnWriteHashMap<String, RoutingPolicy> routingPolicyCache = new CopyOnWriteHashMap<>();
-
- /**
- * Registers a protocol with this repository. This will overwrite any protocol that was registered earlier that has
- * the same name. If this method detects a protocol replacement, it will clear its internal routing policy cache.
- *
- * @param protocol The protocol to register.
- */
- public void putProtocol(Protocol protocol) {
- if (protocols.put(protocol.getName(), protocol) != null) {
- routingPolicyCache.clear();
- }
- }
-
- /**
- * Returns whether or not this repository contains a protocol with the given name. Given the concurrent nature of
- * things, one should not invoke this method followed by {@link #getProtocol(String)} and expect the return value to
- * be non-null. Instead just get the protocol and compare it to null.
- *
- * @param name The name to check for.
- * @return True if the named protocol is registered.
- */
- public boolean hasProtocol(String name) {
- return protocols.containsKey(name);
- }
-
- /**
- * Returns the protocol whose name matches the given argument. This method will return null if no such protocol has
- * been registered.
- *
- * @param name The name of the protocol to return.
- * @return The protocol registered, or null.
- */
- public Protocol getProtocol(String name) {
- return protocols.get(name);
- }
-
- /**
- * Creates and returns a routing policy that matches the given arguments. If a routing policy has been created
- * previously using the exact same parameters, this method will returned that cached instance instead of creating
- * another. Not that when you replace a protocol using {@link #putProtocol(Protocol)} the policy cache is cleared.
- *
- * @param protocolName The name of the protocol whose routing policy to create.
- * @param policyName The name of the routing policy to create.
- * @param policyParam The parameter to pass to the routing policy constructor.
- * @return The created routing policy.
- */
- public RoutingPolicy getRoutingPolicy(String protocolName, String policyName, String policyParam) {
- String cacheKey = protocolName + "." + policyName + "." + policyParam;
- RoutingPolicy ret = routingPolicyCache.get(cacheKey);
- if (ret != null) {
- return ret;
- }
- synchronized (this) {
- Protocol protocol = getProtocol(protocolName);
- if (protocol == null) {
- log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' not supported.");
- return null;
- }
- try {
- ret = protocol.createPolicy(policyName, policyParam);
- } catch (RuntimeException e) {
- log.log(LogLevel.ERROR, "Protcol '" + protocolName + "' threw an exception: " + e.getMessage(), e);
- return null;
- }
- if (ret == null) {
- log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' failed to create routing policy '" + policyName +
- "' with parameter '" + policyParam + "'.");
- return null;
- }
- routingPolicyCache.put(cacheKey, ret);
- }
- return ret;
- }
-
- public final RoutingPolicy getRoutingPolicy(Utf8String protocolName, String policyName, String policyParam) {
- return getRoutingPolicy(protocolName.toString(), policyName, policyParam);
- }
-
- /**
- * Clears the internal cache of routing policies.
- */
- public synchronized void clearPolicyCache() {
- for (RoutingPolicy policy : routingPolicyCache.values()) {
- policy.destroy();
- }
- routingPolicyCache.clear();
- }
-}
+package com.yahoo.messagebus; + +import com.yahoo.concurrent.CopyOnWriteHashMap; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.routing.RoutingPolicy; +import com.yahoo.text.Utf8String; + +import java.util.logging.Logger; + +/** + * Implements a thread-safe repository for protocols and their routing policies. This manages an internal cache of + * routing policies so that similarly referenced policy directives share the same instance of a policy. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ProtocolRepository { + + private static final Logger log = Logger.getLogger(ProtocolRepository.class.getName()); + private final CopyOnWriteHashMap<String, Protocol> protocols = new CopyOnWriteHashMap<>(); + private final CopyOnWriteHashMap<String, RoutingPolicy> routingPolicyCache = new CopyOnWriteHashMap<>(); + + /** + * Registers a protocol with this repository. This will overwrite any protocol that was registered earlier that has + * the same name. If this method detects a protocol replacement, it will clear its internal routing policy cache. + * + * @param protocol The protocol to register. + */ + public void putProtocol(Protocol protocol) { + if (protocols.put(protocol.getName(), protocol) != null) { + routingPolicyCache.clear(); + } + } + + /** + * Returns whether or not this repository contains a protocol with the given name. Given the concurrent nature of + * things, one should not invoke this method followed by {@link #getProtocol(String)} and expect the return value to + * be non-null. Instead just get the protocol and compare it to null. + * + * @param name The name to check for. + * @return True if the named protocol is registered. + */ + public boolean hasProtocol(String name) { + return protocols.containsKey(name); + } + + /** + * Returns the protocol whose name matches the given argument. This method will return null if no such protocol has + * been registered. + * + * @param name The name of the protocol to return. + * @return The protocol registered, or null. + */ + public Protocol getProtocol(String name) { + return protocols.get(name); + } + + /** + * Creates and returns a routing policy that matches the given arguments. If a routing policy has been created + * previously using the exact same parameters, this method will returned that cached instance instead of creating + * another. Not that when you replace a protocol using {@link #putProtocol(Protocol)} the policy cache is cleared. + * + * @param protocolName The name of the protocol whose routing policy to create. + * @param policyName The name of the routing policy to create. + * @param policyParam The parameter to pass to the routing policy constructor. + * @return The created routing policy. + */ + public RoutingPolicy getRoutingPolicy(String protocolName, String policyName, String policyParam) { + String cacheKey = protocolName + "." + policyName + "." + policyParam; + RoutingPolicy ret = routingPolicyCache.get(cacheKey); + if (ret != null) { + return ret; + } + synchronized (this) { + Protocol protocol = getProtocol(protocolName); + if (protocol == null) { + log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' not supported."); + return null; + } + try { + ret = protocol.createPolicy(policyName, policyParam); + } catch (RuntimeException e) { + log.log(LogLevel.ERROR, "Protcol '" + protocolName + "' threw an exception: " + e.getMessage(), e); + return null; + } + if (ret == null) { + log.log(LogLevel.ERROR, "Protocol '" + protocolName + "' failed to create routing policy '" + policyName + + "' with parameter '" + policyParam + "'."); + return null; + } + routingPolicyCache.put(cacheKey, ret); + } + return ret; + } + + public final RoutingPolicy getRoutingPolicy(Utf8String protocolName, String policyName, String policyParam) { + return getRoutingPolicy(protocolName.toString(), policyName, policyParam); + } + + /** + * Clears the internal cache of routing policies. + */ + public synchronized void clearPolicyCache() { + for (RoutingPolicy policy : routingPolicyCache.values()) { + policy.destroy(); + } + routingPolicyCache.clear(); + } +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java index cfa50a35549..b0eb56ce657 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java @@ -33,12 +33,12 @@ public class RPCMessageBus extends NetworkMessageBus { public RPCMessageBus(MessageBusParams mbusParams, RPCNetworkParams rpcParams, String routingCfgId) { this(mbusParams, new RPCNetwork(rpcParams), routingCfgId); } - + private RPCMessageBus(MessageBusParams mbusParams, RPCNetwork network, String routingCfgId) { this(new MessageBus(network, mbusParams), network, routingCfgId); } - - private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) { + + private RPCMessageBus(MessageBus messageBus, RPCNetwork network, String routingCfgId) { super(network, messageBus); configAgent = new ConfigAgent(routingCfgId != null ? routingCfgId : "client", messageBus); configAgent.subscribe(); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java b/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java index fe894427a13..aa091cc1f29 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SendProxy.java @@ -1,93 +1,93 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-import com.yahoo.concurrent.SystemTimer;
-import com.yahoo.messagebus.metrics.RouteMetricSet;
-import com.yahoo.messagebus.network.Network;
-import com.yahoo.messagebus.routing.Resender;
-import com.yahoo.messagebus.routing.RoutingNode;
-import com.yahoo.log.LogLevel;
-
-import java.util.logging.Logger;
-
-/**
- * This class owns a message that is being sent by message bus. Once a reply is received, the message is attached to it
- * and returned to the application. This also implements the discard policy of {@link RoutingNode}.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SendProxy implements MessageHandler, ReplyHandler {
-
- private static final Logger log = Logger.getLogger(SendProxy.class.getName());
- private final MessageBus mbus;
- private final Network net;
- private final Resender resender;
- private Message msg = null;
- private boolean logTrace = false;
- private long sendTime = 0;
-
- /**
- * Constructs a new instance of this class to maintain sending of a single message.
- *
- * @param mbus The message bus that owns this.
- * @param net The network layer to transmit through.
- * @param resender The resender to use.
- */
- public SendProxy(MessageBus mbus, Network net, Resender resender) {
- this.mbus = mbus;
- this.net = net;
- this.resender = resender;
- sendTime = SystemTimer.INSTANCE.milliTime();
- }
-
- public void handleMessage(Message msg) {
- Trace trace = msg.getTrace();
- if (trace.getLevel() == 0) {
- if (log.isLoggable(LogLevel.SPAM)) {
- trace.setLevel(9);
- logTrace = true;
- } else if (log.isLoggable(LogLevel.DEBUG)) {
- trace.setLevel(6);
- logTrace = true;
- }
- }
- this.msg = msg;
- RoutingNode root = new RoutingNode(mbus, net, resender, this, msg);
- root.send();
- }
-
- public void handleReply(Reply reply) {
- if (reply == null) {
- msg.discard();
- } else {
- Trace trace = msg.getTrace();
- if (logTrace) {
- if (reply.hasErrors()) {
- log.log(LogLevel.DEBUG, "Trace for reply with error(s):\n" + reply.getTrace());
- } else if (log.isLoggable(LogLevel.SPAM)) {
- log.log(LogLevel.SPAM, "Trace for reply:\n" + reply.getTrace());
- }
- Trace empty = new Trace();
- trace.swap(empty);
- } else if (trace.getLevel() > 0) {
- trace.getRoot().addChild(reply.getTrace().getRoot());
- trace.getRoot().normalize();
- }
- reply.swapState(msg);
- reply.setMessage(msg);
-
- if (msg.getRoute() != null) {
- RouteMetricSet metrics = mbus.getMetrics().getRouteMetrics(msg.getRoute());
- for (int i = 0; i < reply.getNumErrors(); i++) {
- metrics.addFailure(reply.getError(i));
- }
- if (reply.getNumErrors() == 0) {
- metrics.latency.addValue(msg.getTimeReceived() - sendTime);
- }
- }
-
- ReplyHandler handler = reply.popHandler();
- handler.handleReply(reply);
- }
- }
-}
+package com.yahoo.messagebus; + +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.messagebus.metrics.RouteMetricSet; +import com.yahoo.messagebus.network.Network; +import com.yahoo.messagebus.routing.Resender; +import com.yahoo.messagebus.routing.RoutingNode; +import com.yahoo.log.LogLevel; + +import java.util.logging.Logger; + +/** + * This class owns a message that is being sent by message bus. Once a reply is received, the message is attached to it + * and returned to the application. This also implements the discard policy of {@link RoutingNode}. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SendProxy implements MessageHandler, ReplyHandler { + + private static final Logger log = Logger.getLogger(SendProxy.class.getName()); + private final MessageBus mbus; + private final Network net; + private final Resender resender; + private Message msg = null; + private boolean logTrace = false; + private long sendTime = 0; + + /** + * Constructs a new instance of this class to maintain sending of a single message. + * + * @param mbus The message bus that owns this. + * @param net The network layer to transmit through. + * @param resender The resender to use. + */ + public SendProxy(MessageBus mbus, Network net, Resender resender) { + this.mbus = mbus; + this.net = net; + this.resender = resender; + sendTime = SystemTimer.INSTANCE.milliTime(); + } + + public void handleMessage(Message msg) { + Trace trace = msg.getTrace(); + if (trace.getLevel() == 0) { + if (log.isLoggable(LogLevel.SPAM)) { + trace.setLevel(9); + logTrace = true; + } else if (log.isLoggable(LogLevel.DEBUG)) { + trace.setLevel(6); + logTrace = true; + } + } + this.msg = msg; + RoutingNode root = new RoutingNode(mbus, net, resender, this, msg); + root.send(); + } + + public void handleReply(Reply reply) { + if (reply == null) { + msg.discard(); + } else { + Trace trace = msg.getTrace(); + if (logTrace) { + if (reply.hasErrors()) { + log.log(LogLevel.DEBUG, "Trace for reply with error(s):\n" + reply.getTrace()); + } else if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Trace for reply:\n" + reply.getTrace()); + } + Trace empty = new Trace(); + trace.swap(empty); + } else if (trace.getLevel() > 0) { + trace.getRoot().addChild(reply.getTrace().getRoot()); + trace.getRoot().normalize(); + } + reply.swapState(msg); + reply.setMessage(msg); + + if (msg.getRoute() != null) { + RouteMetricSet metrics = mbus.getMetrics().getRouteMetrics(msg.getRoute()); + for (int i = 0; i < reply.getNumErrors(); i++) { + metrics.addFailure(reply.getError(i)); + } + if (reply.getNumErrors() == 0) { + metrics.latency.addValue(msg.getTimeReceived() - sendTime); + } + } + + ReplyHandler handler = reply.popHandler(); + handler.handleReply(reply); + } + } +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java index bb1ae9e69b3..f65b165a768 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java @@ -1,91 +1,91 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-/**
- * This is an implementatin of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a
- * {@link SourceSession} is allowed to have. You may choose to set a limit to the total number of pending messages (by
- * way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link
- * #setMaxPendingSize(long)}), or some combination thereof.
- *
- * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class StaticThrottlePolicy implements ThrottlePolicy {
-
- private int maxPendingCount = 0;
- private long maxPendingSize = 0;
- private long pendingSize = 0;
-
- public boolean canSend(Message msg, int pendingCount) {
- if (maxPendingCount > 0 && pendingCount >= maxPendingCount) {
- return false;
- }
- if (maxPendingSize > 0 && pendingSize >= maxPendingSize) {
- return false;
- }
- return true;
- }
-
- public void processMessage(Message msg) {
- int size = msg.getApproxSize();
- msg.setContext(size);
- pendingSize += size;
- }
-
- public void processReply(Reply reply) {
- int size = (Integer)reply.getContext();
- pendingSize -= size;
- }
-
- /**
- * Returns the maximum number of pending messages allowed.
- *
- * @return The max limit.
- */
- public int getMaxPendingCount() {
- return maxPendingCount;
- }
-
- /**
- * Sets the maximum number of pending messages allowed.
- *
- * @param maxCount The max count.
- * @return This, to allow chaining.
- */
- public StaticThrottlePolicy setMaxPendingCount(int maxCount) {
- maxPendingCount = maxCount;
- return this;
- }
-
- /**
- * Returns the maximum total size of pending messages allowed.
- *
- * @return The max limit.
- */
- public long getMaxPendingSize() {
- return maxPendingSize;
- }
-
- /**
- * Sets the maximum total size of pending messages allowed. This size is relative to the value returned by {@link
- * com.yahoo.messagebus.Message#getApproxSize()}.
- *
- * @param maxSize The max size.
- * @return This, to allow chaining.
- */
- public StaticThrottlePolicy setMaxPendingSize(long maxSize) {
- maxPendingSize = maxSize;
- return this;
- }
-
- /**
- * Returns the total size of pending messages.
- *
- * @return The size.
- */
- public long getPendingSize() {
- return pendingSize;
- }
-
-}
+package com.yahoo.messagebus; + +/** + * This is an implementatin of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a + * {@link SourceSession} is allowed to have. You may choose to set a limit to the total number of pending messages (by + * way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link + * #setMaxPendingSize(long)}), or some combination thereof. + * + * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class StaticThrottlePolicy implements ThrottlePolicy { + + private int maxPendingCount = 0; + private long maxPendingSize = 0; + private long pendingSize = 0; + + public boolean canSend(Message msg, int pendingCount) { + if (maxPendingCount > 0 && pendingCount >= maxPendingCount) { + return false; + } + if (maxPendingSize > 0 && pendingSize >= maxPendingSize) { + return false; + } + return true; + } + + public void processMessage(Message msg) { + int size = msg.getApproxSize(); + msg.setContext(size); + pendingSize += size; + } + + public void processReply(Reply reply) { + int size = (Integer)reply.getContext(); + pendingSize -= size; + } + + /** + * Returns the maximum number of pending messages allowed. + * + * @return The max limit. + */ + public int getMaxPendingCount() { + return maxPendingCount; + } + + /** + * Sets the maximum number of pending messages allowed. + * + * @param maxCount The max count. + * @return This, to allow chaining. + */ + public StaticThrottlePolicy setMaxPendingCount(int maxCount) { + maxPendingCount = maxCount; + return this; + } + + /** + * Returns the maximum total size of pending messages allowed. + * + * @return The max limit. + */ + public long getMaxPendingSize() { + return maxPendingSize; + } + + /** + * Sets the maximum total size of pending messages allowed. This size is relative to the value returned by {@link + * com.yahoo.messagebus.Message#getApproxSize()}. + * + * @param maxSize The max size. + * @return This, to allow chaining. + */ + public StaticThrottlePolicy setMaxPendingSize(long maxSize) { + maxPendingSize = maxSize; + return this; + } + + /** + * Returns the total size of pending messages. + * + * @return The size. + */ + public long getPendingSize() { + return pendingSize; + } + +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java index 611694b0c62..6b095060427 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java @@ -1,36 +1,36 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-/**
- * An implementation of this interface is used by {@link SourceSession} to throttle output. Every message entering
- * {@link SourceSession#send(Message)} needs to be accepted by this interface's {@link #canSend(Message, int)} method.
- * All messages accepted are passed through the {@link #processMessage(Message)} method, and the corresponding replies
- * are passed through the {@link #processReply(Reply)} method.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public interface ThrottlePolicy {
-
- /**
- * Returns whether or not the given message can be sent according to the current state of this policy.
- *
- * @param msg The message to evaluate.
- * @param pendingCount The current number of pending messages.
- * @return True to send the message.
- */
- public boolean canSend(Message msg, int pendingCount);
-
- /**
- * This method is called once for every message that was accepted by {@link #canSend(Message, int)} and sent.
- *
- * @param msg The message beint sent.
- */
- public void processMessage(Message msg);
-
- /**
- * This method is called once for every reply that is received.
- *
- * @param reply The reply received.
- */
- public void processReply(Reply reply);
-}
+package com.yahoo.messagebus; + +/** + * An implementation of this interface is used by {@link SourceSession} to throttle output. Every message entering + * {@link SourceSession#send(Message)} needs to be accepted by this interface's {@link #canSend(Message, int)} method. + * All messages accepted are passed through the {@link #processMessage(Message)} method, and the corresponding replies + * are passed through the {@link #processReply(Reply)} method. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public interface ThrottlePolicy { + + /** + * Returns whether or not the given message can be sent according to the current state of this policy. + * + * @param msg The message to evaluate. + * @param pendingCount The current number of pending messages. + * @return True to send the message. + */ + public boolean canSend(Message msg, int pendingCount); + + /** + * This method is called once for every message that was accepted by {@link #canSend(Message, int)} and sent. + * + * @param msg The message beint sent. + */ + public void processMessage(Message msg); + + /** + * This method is called once for every reply that is received. + * + * @param reply The reply received. + */ + public void processReply(Reply reply); +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java b/messagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java index 994eb17b434..07b91a3280f 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/TraceLevel.java @@ -1,30 +1,30 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-/**
- * This class defines the {@link Trace} levels used by message bus.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public final class TraceLevel {
-
- /**
- * Traces whenever an Error is added to a Reply.
- */
- public static final int ERROR = 1;
-
- /**
- * Traces sending and receiving messages and replies on network level.
- */
- public static final int SEND_RECEIVE = 4;
-
- /**
- * Traces splitting messages and merging replies.
- */
- public static final int SPLIT_MERGE = 5;
-
- /**
- * Traces information about which internal components are processing a routable.
- */
- public static final int COMPONENT = 6;
+package com.yahoo.messagebus; + +/** + * This class defines the {@link Trace} levels used by message bus. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public final class TraceLevel { + + /** + * Traces whenever an Error is added to a Reply. + */ + public static final int ERROR = 1; + + /** + * Traces sending and receiving messages and replies on network level. + */ + public static final int SEND_RECEIVE = 4; + + /** + * Traces splitting messages and merging replies. + */ + public static final int SPLIT_MERGE = 5; + + /** + * Traces information about which internal components are processing a routable. + */ + public static final int COMPONENT = 6; }
\ No newline at end of file diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index 78cf352cfbf..efe96c5babb 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -35,7 +35,7 @@ public class LocalNetwork implements Network { public LocalNetwork() { this(new LocalWire()); } - + public LocalNetwork(LocalWire wire) { this.wire = wire; this.hostId = wire.newHostId(); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java index 47f688e0f51..adc393a77c7 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/ApplicationSpec.java @@ -1,131 +1,131 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * This class holds the specifications of an application running message bus services. It is used for ensuring that a
- * {@link RoutingSpec} holds valid routing specifications.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ApplicationSpec {
-
- private final HashMap<String, HashSet<String>> services = new HashMap<String, HashSet<String>>();
-
- /**
- * Constructs a new instance of this class.
- */
- public ApplicationSpec() {
- // empty
- }
-
- /**
- * Implements the copy constructor.
- *
- * @param obj The object to copy.
- */
- public ApplicationSpec(ApplicationSpec obj) {
- add(obj);
- }
-
- /**
- * Adds the content of the given application to this.
- *
- * @param app The application whose content to copy.
- * @return This, to allow chaining.
- */
- public ApplicationSpec add(ApplicationSpec app) {
- for (Map.Entry<String, HashSet<String>> entry : app.services.entrySet()) {
- String protocol = entry.getKey();
- for (String service : entry.getValue()) {
- addService(protocol, service);
- }
- }
- return this;
- }
-
- /**
- * Adds a service name to the list of known services.
- *
- * @param protocol The protocol for which to add the service.
- * @param name The service to add.
- * @return This, to allow chaining.
- */
- public ApplicationSpec addService(String protocol, String name) {
- if (!services.containsKey(protocol)) {
- services.put(protocol, new HashSet<String>());
- }
- services.get(protocol).add(name);
- return this;
- }
-
- /**
- * Determines whether or not the given service pattern matches any of the known services.
- *
- * @param protocol The protocol whose services to check.
- * @param pattern The pattern to match.
- * @return True if at least one service was found.
- */
- public boolean isService(String protocol, String pattern) {
- if (services.containsKey(protocol)) {
- Pattern regex = toRegex(pattern);
- for (String service : services.get(protocol)) {
- if (regex.matcher(service).find()) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Converts the given string pattern to a usable regex pattern object.
- *
- * @param pattern The string pattern to convert.
- * @return The corresponding regex pattern object.
- */
- private static Pattern toRegex(String pattern) {
- StringBuilder ret = new StringBuilder();
- ret.append("^");
- for (int i = 0; i < pattern.length(); i++) {
- ret.append(toRegex(pattern.charAt(i)));
- }
- ret.append("$");
- return Pattern.compile(ret.toString());
- }
-
- /**
- * Converts a single string pattern char to a regex string. This method is invoked by {@link #toRegex(String)} once
- * for each character in the string pattern.
- *
- * @param c The character to convert.
- * @return The corresponding regex pattern string.
- */
- private static String toRegex(char c) {
- switch (c) {
- case '*':
- return ".*";
- case '?':
- return ".";
- case '^':
- case '$':
- case '|':
- case '{':
- case '}':
- case '(':
- case ')':
- case '[':
- case ']':
- case '\\':
- case '+':
- case '.':
- return "\\" + c;
- default:
- return "" + c;
- }
- }
-}
+package com.yahoo.messagebus.routing; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * This class holds the specifications of an application running message bus services. It is used for ensuring that a + * {@link RoutingSpec} holds valid routing specifications. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ApplicationSpec { + + private final HashMap<String, HashSet<String>> services = new HashMap<String, HashSet<String>>(); + + /** + * Constructs a new instance of this class. + */ + public ApplicationSpec() { + // empty + } + + /** + * Implements the copy constructor. + * + * @param obj The object to copy. + */ + public ApplicationSpec(ApplicationSpec obj) { + add(obj); + } + + /** + * Adds the content of the given application to this. + * + * @param app The application whose content to copy. + * @return This, to allow chaining. + */ + public ApplicationSpec add(ApplicationSpec app) { + for (Map.Entry<String, HashSet<String>> entry : app.services.entrySet()) { + String protocol = entry.getKey(); + for (String service : entry.getValue()) { + addService(protocol, service); + } + } + return this; + } + + /** + * Adds a service name to the list of known services. + * + * @param protocol The protocol for which to add the service. + * @param name The service to add. + * @return This, to allow chaining. + */ + public ApplicationSpec addService(String protocol, String name) { + if (!services.containsKey(protocol)) { + services.put(protocol, new HashSet<String>()); + } + services.get(protocol).add(name); + return this; + } + + /** + * Determines whether or not the given service pattern matches any of the known services. + * + * @param protocol The protocol whose services to check. + * @param pattern The pattern to match. + * @return True if at least one service was found. + */ + public boolean isService(String protocol, String pattern) { + if (services.containsKey(protocol)) { + Pattern regex = toRegex(pattern); + for (String service : services.get(protocol)) { + if (regex.matcher(service).find()) { + return true; + } + } + } + return false; + } + + /** + * Converts the given string pattern to a usable regex pattern object. + * + * @param pattern The string pattern to convert. + * @return The corresponding regex pattern object. + */ + private static Pattern toRegex(String pattern) { + StringBuilder ret = new StringBuilder(); + ret.append("^"); + for (int i = 0; i < pattern.length(); i++) { + ret.append(toRegex(pattern.charAt(i))); + } + ret.append("$"); + return Pattern.compile(ret.toString()); + } + + /** + * Converts a single string pattern char to a regex string. This method is invoked by {@link #toRegex(String)} once + * for each character in the string pattern. + * + * @param c The character to convert. + * @return The corresponding regex pattern string. + */ + private static String toRegex(char c) { + switch (c) { + case '*': + return ".*"; + case '?': + return "."; + case '^': + case '$': + case '|': + case '{': + case '}': + case '(': + case ')': + case '[': + case ']': + case '\\': + case '+': + case '.': + return "\\" + c; + default: + return "" + c; + } + } +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java index 9623b45ca20..89ee638f41c 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/HopDirective.java @@ -1,26 +1,26 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-/**
- * This class is the base class for the primitives that make up a {@link Hop}'s selector.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public interface HopDirective {
-
- /**
- * Returns true if this directive matches another.
- *
- * @param dir The directive to compare this to.
- * @return True if this matches the argument.
- */
- public boolean matches(HopDirective dir);
-
- /**
- * Returns a string representation of this that can be debugged but not parsed.
- *
- * @return The debug string.
- */
- public String toDebugString();
-}
-
+package com.yahoo.messagebus.routing; + +/** + * This class is the base class for the primitives that make up a {@link Hop}'s selector. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public interface HopDirective { + + /** + * Returns true if this directive matches another. + * + * @param dir The directive to compare this to. + * @return True if this matches the argument. + */ + public boolean matches(HopDirective dir); + + /** + * Returns a string representation of this that can be debugged but not parsed. + * + * @return The debug string. + */ + public String toDebugString(); +} + diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java index e4b88dca6e7..7dd1966cb7b 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/PolicyDirective.java @@ -1,83 +1,83 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-/**
- * This class represents a policy directive within a {@link Hop}'s selector. This means to create the named protocol
- * using the given parameter string, and the running that protocol within the context of this directive.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class PolicyDirective implements HopDirective {
-
- private final String name;
- private final String param;
-
- /**
- * Constructs a new policy selector item.
- *
- * @param name The name of the policy to invoke.
- * @param param The parameter to pass to the name constructor.
- */
- public PolicyDirective(String name, String param) {
- this.name = name;
- this.param = param;
- }
-
- @Override
- public boolean matches(HopDirective dir) {
- return true;
- }
-
- /**
- * Returns the name of the policy that this item is to invoke.
- *
- * @return The name name.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Returns the parameter string for this policy directive.
- *
- * @return The parameter.
- */
- public String getParam() {
- return param;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof PolicyDirective)) {
- return false;
- }
- PolicyDirective rhs = (PolicyDirective)obj;
- if (!name.equals(rhs.name)) {
- return false;
- }
- if (param == null && rhs.param != null) {
- return false;
- }
- if (param != null && !param.equals(rhs.param)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "[" + name + (param != null && param.length() > 0 ? ":" + param : "") + "]";
- }
-
- @Override
- public String toDebugString() {
- return "PolicyDirective(name = '" + name + "', param = '" + param + "')";
- }
-
- @Override
- public int hashCode() {
- int result = name != null ? name.hashCode() : 0;
- result = 31 * result + (param != null ? param.hashCode() : 0);
- return result;
- }
+package com.yahoo.messagebus.routing; + +/** + * This class represents a policy directive within a {@link Hop}'s selector. This means to create the named protocol + * using the given parameter string, and the running that protocol within the context of this directive. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class PolicyDirective implements HopDirective { + + private final String name; + private final String param; + + /** + * Constructs a new policy selector item. + * + * @param name The name of the policy to invoke. + * @param param The parameter to pass to the name constructor. + */ + public PolicyDirective(String name, String param) { + this.name = name; + this.param = param; + } + + @Override + public boolean matches(HopDirective dir) { + return true; + } + + /** + * Returns the name of the policy that this item is to invoke. + * + * @return The name name. + */ + public String getName() { + return name; + } + + /** + * Returns the parameter string for this policy directive. + * + * @return The parameter. + */ + public String getParam() { + return param; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof PolicyDirective)) { + return false; + } + PolicyDirective rhs = (PolicyDirective)obj; + if (!name.equals(rhs.name)) { + return false; + } + if (param == null && rhs.param != null) { + return false; + } + if (param != null && !param.equals(rhs.param)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "[" + name + (param != null && param.length() > 0 ? ":" + param : "") + "]"; + } + + @Override + public String toDebugString() { + return "PolicyDirective(name = '" + name + "', param = '" + param + "')"; + } + + @Override + public int hashCode() { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + (param != null ? param.hashCode() : 0); + return result; + } }
\ No newline at end of file diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java index 19ea949e741..4231c389a5f 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryPolicy.java @@ -1,30 +1,30 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-/**
- * When a {@link com.yahoo.messagebus.Reply} containing errors is returned to a {@link com.yahoo.messagebus.MessageBus},
- * an object implementing this interface is consulted on whether or not to resend the corresponding {@link
- * com.yahoo.messagebus.Message}. The policy is passed to the message bus at creation time using the {@link
- * com.yahoo.messagebus.MessageBusParams#setRetryPolicy(RetryPolicy)} method.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public interface RetryPolicy {
-
- /**
- * Returns whether or not a {@link com.yahoo.messagebus.Reply} containing an {@link com.yahoo.messagebus.Error} with
- * the given error code can be retried. This method is invoked once for each error in a reply.
- *
- * @param errorCode The code to check.
- * @return True if the message can be resent.
- */
- public boolean canRetry(int errorCode);
-
- /**
- * Returns the number of seconds to delay resending a message.
- *
- * @param retry The retry attempt.
- * @return The delay in seconds.
- */
- public double getRetryDelay(int retry);
-}
+package com.yahoo.messagebus.routing; + +/** + * When a {@link com.yahoo.messagebus.Reply} containing errors is returned to a {@link com.yahoo.messagebus.MessageBus}, + * an object implementing this interface is consulted on whether or not to resend the corresponding {@link + * com.yahoo.messagebus.Message}. The policy is passed to the message bus at creation time using the {@link + * com.yahoo.messagebus.MessageBusParams#setRetryPolicy(RetryPolicy)} method. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public interface RetryPolicy { + + /** + * Returns whether or not a {@link com.yahoo.messagebus.Reply} containing an {@link com.yahoo.messagebus.Error} with + * the given error code can be retried. This method is invoked once for each error in a reply. + * + * @param errorCode The code to check. + * @return True if the message can be resent. + */ + public boolean canRetry(int errorCode); + + /** + * Returns the number of seconds to delay resending a message. + * + * @param retry The retry attempt. + * @return The delay in seconds. + */ + public double getRetryDelay(int retry); +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java index 128ca3e819d..9dd2f7889ec 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RetryTransientErrorsPolicy.java @@ -1,48 +1,48 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-import com.yahoo.messagebus.ErrorCode;
-
-/**
- * Implements a retry policy that allows resending of any error that is not fatal. It also does progressive back-off,
- * delaying each attempt by the given time multiplied by the retry attempt.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RetryTransientErrorsPolicy implements RetryPolicy {
-
- private volatile boolean enabled = true;
- private volatile double baseDelay = 1;
-
- /**
- * Sets whether or not this policy should allow retries or not.
- *
- * @param enabled True to allow retries.
- * @return This, to allow chaining.
- */
- public RetryTransientErrorsPolicy setEnabled(boolean enabled) {
- this.enabled = enabled;
- return this;
- }
-
- /**
- * Sets the base delay in seconds to wait between retries. This amount is multiplied by the retry number.
- *
- * @param baseDelay The time in seconds.
- * @return This, to allow chaining.
- */
- public RetryTransientErrorsPolicy setBaseDelay(double baseDelay) {
- this.baseDelay = baseDelay;
- return this;
- }
-
- @Override
- public boolean canRetry(int errorCode) {
- return enabled && errorCode < ErrorCode.FATAL_ERROR;
- }
-
- @Override
- public double getRetryDelay(int retry) {
- return baseDelay * retry;
- }
-}
+package com.yahoo.messagebus.routing; + +import com.yahoo.messagebus.ErrorCode; + +/** + * Implements a retry policy that allows resending of any error that is not fatal. It also does progressive back-off, + * delaying each attempt by the given time multiplied by the retry attempt. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RetryTransientErrorsPolicy implements RetryPolicy { + + private volatile boolean enabled = true; + private volatile double baseDelay = 1; + + /** + * Sets whether or not this policy should allow retries or not. + * + * @param enabled True to allow retries. + * @return This, to allow chaining. + */ + public RetryTransientErrorsPolicy setEnabled(boolean enabled) { + this.enabled = enabled; + return this; + } + + /** + * Sets the base delay in seconds to wait between retries. This amount is multiplied by the retry number. + * + * @param baseDelay The time in seconds. + * @return This, to allow chaining. + */ + public RetryTransientErrorsPolicy setBaseDelay(double baseDelay) { + this.baseDelay = baseDelay; + return this; + } + + @Override + public boolean canRetry(int errorCode) { + return enabled && errorCode < ErrorCode.FATAL_ERROR; + } + + @Override + public double getRetryDelay(int retry) { + return baseDelay * retry; + } +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java index 6ea79a256e0..71abfc5376e 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RouteDirective.java @@ -1,63 +1,63 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-/**
- * This class represents a route directive within a {@link Hop}'s selector. This will be replaced by the named route
- * when evaluated. If the route is not present in the running protocol's routing table, routing will fail.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RouteDirective implements HopDirective {
-
- private final String name;
-
- /**
- * Constructs a new directive to insert a route.
- *
- * @param name The name of the route to insert.
- */
- public RouteDirective(String name) {
- this.name = name;
- }
-
- @Override
- public boolean matches(HopDirective dir) {
- return dir instanceof RouteDirective && name.equals(((RouteDirective)dir).name);
- }
-
- /**
- * Returns the name of the route to insert.
- *
- * @return The route name.
- */
- public String getName() {
- return name;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof RouteDirective)) {
- return false;
- }
- RouteDirective rhs = (RouteDirective)obj;
- if (!name.equals(rhs.name)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "route:" + name;
- }
-
- @Override
- public String toDebugString() {
- return "RouteDirective(name = '" + name + "')";
- }
-
- @Override
- public int hashCode() {
- return name != null ? name.hashCode() : 0;
- }
-}
+package com.yahoo.messagebus.routing; + +/** + * This class represents a route directive within a {@link Hop}'s selector. This will be replaced by the named route + * when evaluated. If the route is not present in the running protocol's routing table, routing will fail. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RouteDirective implements HopDirective { + + private final String name; + + /** + * Constructs a new directive to insert a route. + * + * @param name The name of the route to insert. + */ + public RouteDirective(String name) { + this.name = name; + } + + @Override + public boolean matches(HopDirective dir) { + return dir instanceof RouteDirective && name.equals(((RouteDirective)dir).name); + } + + /** + * Returns the name of the route to insert. + * + * @return The route name. + */ + public String getName() { + return name; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RouteDirective)) { + return false; + } + RouteDirective rhs = (RouteDirective)obj; + if (!name.equals(rhs.name)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "route:" + name; + } + + @Override + public String toDebugString() { + return "RouteDirective(name = '" + name + "')"; + } + + @Override + public int hashCode() { + return name != null ? name.hashCode() : 0; + } +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java index dbf152fdfcf..e8caf3ad9b1 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNodeIterator.java @@ -1,107 +1,107 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-import com.yahoo.messagebus.Reply;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Implements an iterator for routing nodes. Use {@link RoutingContext#getChildIterator()} to retrieve an instance of
- * this.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RoutingNodeIterator {
-
- // The underlying iterator.
- private Iterator<RoutingNode> it;
-
- // The current child entry.
- private RoutingNode entry;
-
- /**
- * Constructs a new iterator based on a given list.
- *
- * @param children The list to iterate through.
- */
- public RoutingNodeIterator(List<RoutingNode> children) {
- it = children.iterator();
- next();
- }
-
- /**
- * Steps to the next child in the map.
- *
- * @return This, to allow chaining.
- */
- public RoutingNodeIterator next() {
- entry = it.hasNext() ? it.next() : null;
- return this;
- }
-
- /**
- * Skips the given number of children.
- *
- * @param num The number of children to skip.
- * @return This, to allow chaining.
- */
- public RoutingNodeIterator skip(int num) {
- for (int i = 0; i < num && isValid(); ++i) {
- next();
- }
- return this;
- }
-
- /**
- * Returns whether or not this iterator is valid.
- *
- * @return True if we are still pointing to a valid entry.
- */
- public boolean isValid() {
- return entry != null;
- }
-
- /**
- * Returns the route of the current child.
- *
- * @return The route.
- */
- public Route getRoute() {
- return entry.getRoute();
- }
-
- /**
- * Returns whether or not a reply is set in the current child.
- *
- * @return True if a reply is available.
- */
- public boolean hasReply() {
- return entry.hasReply();
- }
-
- /**
- * Removes and returns the reply of the current child. This is the correct way of reusing a reply of a child node,
- * the {@link #getReplyRef()} should be used when just inspecting a child reply.
- *
- * @return The reply.
- */
- public Reply removeReply() {
- Reply ret = entry.getReply();
- ret.getTrace().setLevel(entry.getTrace().getLevel());
- ret.getTrace().swap(entry.getTrace());
- entry.setReply(null);
- return ret;
- }
-
- /**
- * Returns the reply of the current child. It is VERY important that the reply returned by this function is not
- * reused anywhere. This is a reference to another node's reply, do NOT use it for anything but inspection. If you
- * want to retrieve and reuse it, call {@link #removeReply()} instead.
- *
- * @return The reply.
- */
- public Reply getReplyRef() {
- return entry.getReply();
- }
-}
+package com.yahoo.messagebus.routing; + +import com.yahoo.messagebus.Reply; + +import java.util.Iterator; +import java.util.List; + +/** + * Implements an iterator for routing nodes. Use {@link RoutingContext#getChildIterator()} to retrieve an instance of + * this. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RoutingNodeIterator { + + // The underlying iterator. + private Iterator<RoutingNode> it; + + // The current child entry. + private RoutingNode entry; + + /** + * Constructs a new iterator based on a given list. + * + * @param children The list to iterate through. + */ + public RoutingNodeIterator(List<RoutingNode> children) { + it = children.iterator(); + next(); + } + + /** + * Steps to the next child in the map. + * + * @return This, to allow chaining. + */ + public RoutingNodeIterator next() { + entry = it.hasNext() ? it.next() : null; + return this; + } + + /** + * Skips the given number of children. + * + * @param num The number of children to skip. + * @return This, to allow chaining. + */ + public RoutingNodeIterator skip(int num) { + for (int i = 0; i < num && isValid(); ++i) { + next(); + } + return this; + } + + /** + * Returns whether or not this iterator is valid. + * + * @return True if we are still pointing to a valid entry. + */ + public boolean isValid() { + return entry != null; + } + + /** + * Returns the route of the current child. + * + * @return The route. + */ + public Route getRoute() { + return entry.getRoute(); + } + + /** + * Returns whether or not a reply is set in the current child. + * + * @return True if a reply is available. + */ + public boolean hasReply() { + return entry.hasReply(); + } + + /** + * Removes and returns the reply of the current child. This is the correct way of reusing a reply of a child node, + * the {@link #getReplyRef()} should be used when just inspecting a child reply. + * + * @return The reply. + */ + public Reply removeReply() { + Reply ret = entry.getReply(); + ret.getTrace().setLevel(entry.getTrace().getLevel()); + ret.getTrace().swap(entry.getTrace()); + entry.setReply(null); + return ret; + } + + /** + * Returns the reply of the current child. It is VERY important that the reply returned by this function is not + * reused anywhere. This is a reference to another node's reply, do NOT use it for anything but inspection. If you + * want to retrieve and reuse it, call {@link #removeReply()} instead. + * + * @return The reply. + */ + public Reply getReplyRef() { + return entry.getReply(); + } +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java index dd0f6a47596..85be2c26baf 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/TcpDirective.java @@ -1,100 +1,100 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-/**
- * This class represents a tcp directive within a {@link Hop}'s selector. This is a connection string used to establish
- * a direct connection to a host, bypassing service lookups through Slobrok.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class TcpDirective implements HopDirective {
-
- private final String host;
- private final int port;
- private final String session;
-
- /**
- * Constructs a new directive to route directly to a tcp address.
- *
- * @param host The host name to connect to.
- * @param port The port to connect to.
- * @param session The session to route to.
- */
- public TcpDirective(String host, int port, String session) {
- this.host = host;
- this.port = port;
- this.session = session;
- }
-
- @Override
- public boolean matches(HopDirective dir) {
- if (!(dir instanceof TcpDirective)) {
- return false;
- }
- TcpDirective rhs = (TcpDirective)dir;
- return host.equals(rhs.host) && port == rhs.port && session.equals(rhs.session);
- }
-
- /**
- * Returns the host to connect to. This may be an ip address or a name.
- *
- * @return The host.
- */
- public String getHost() {
- return host;
- }
-
- /**
- * Returns the port to connect to on the remove host.
- *
- * @return The port number.
- */
- public int getPort() {
- return port;
- }
-
- /**
- * Returns the name of the session to route to.
- *
- * @return The session name.
- */
- public String getSession() {
- return session;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof TcpDirective)) {
- return false;
- }
- TcpDirective rhs = (TcpDirective)obj;
- if (!host.equals(rhs.host)) {
- return false;
- }
- if (port != rhs.port) {
- return false;
- }
- if (!session.equals(rhs.session)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "tcp/" + host + ":" + port + "/" + session;
- }
-
- @Override
- public String toDebugString() {
- return "TcpDirective(host = '" + host + "', port = " + port + ", session = '" + session + "')";
- }
-
- @Override
- public int hashCode() {
- int result = host != null ? host.hashCode() : 0;
- result = 31 * result + port;
- result = 31 * result + (session != null ? session.hashCode() : 0);
- return result;
- }
-}
+package com.yahoo.messagebus.routing; + +/** + * This class represents a tcp directive within a {@link Hop}'s selector. This is a connection string used to establish + * a direct connection to a host, bypassing service lookups through Slobrok. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class TcpDirective implements HopDirective { + + private final String host; + private final int port; + private final String session; + + /** + * Constructs a new directive to route directly to a tcp address. + * + * @param host The host name to connect to. + * @param port The port to connect to. + * @param session The session to route to. + */ + public TcpDirective(String host, int port, String session) { + this.host = host; + this.port = port; + this.session = session; + } + + @Override + public boolean matches(HopDirective dir) { + if (!(dir instanceof TcpDirective)) { + return false; + } + TcpDirective rhs = (TcpDirective)dir; + return host.equals(rhs.host) && port == rhs.port && session.equals(rhs.session); + } + + /** + * Returns the host to connect to. This may be an ip address or a name. + * + * @return The host. + */ + public String getHost() { + return host; + } + + /** + * Returns the port to connect to on the remove host. + * + * @return The port number. + */ + public int getPort() { + return port; + } + + /** + * Returns the name of the session to route to. + * + * @return The session name. + */ + public String getSession() { + return session; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TcpDirective)) { + return false; + } + TcpDirective rhs = (TcpDirective)obj; + if (!host.equals(rhs.host)) { + return false; + } + if (port != rhs.port) { + return false; + } + if (!session.equals(rhs.session)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "tcp/" + host + ":" + port + "/" + session; + } + + @Override + public String toDebugString() { + return "TcpDirective(host = '" + host + "', port = " + port + ", session = '" + session + "')"; + } + + @Override + public int hashCode() { + int result = host != null ? host.hashCode() : 0; + result = 31 * result + port; + result = 31 * result + (session != null ? session.hashCode() : 0); + return result; + } +} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java index f2f95fa53f8..0f743ef8bf5 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/VerbatimDirective.java @@ -1,65 +1,65 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-/**
- * This class represents a verbatim match within a {@link Hop}'s selector. This is nothing more than a string that will
- * be used as-is when performing service name lookups.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class VerbatimDirective implements HopDirective {
-
- private final String image;
-
- /**
- * Constructs a new verbatim selector item for a given image.
- *
- * @param image The image to assign to this.
- */
- public VerbatimDirective(String image) {
- this.image = image;
- }
-
- @Override
- public boolean matches(HopDirective dir) {
-
- return dir instanceof VerbatimDirective && image.equals(((VerbatimDirective)dir).image);
- }
-
- /**
- * Returns the image to which this is a verbatim match.
- *
- * @return The image.
- */
- public String getImage() {
- return image;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof VerbatimDirective)) {
- return false;
- }
- VerbatimDirective rhs = (VerbatimDirective)obj;
- if (!image.equals(rhs.image)) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- return image != null ? image.hashCode() : 0;
- }
-
- @Override
- public String toString() {
- return image;
- }
-
- @Override
- public String toDebugString() {
- return "VerbatimDirective(image = '" + image + "')";
- }
-}
-
+package com.yahoo.messagebus.routing; + +/** + * This class represents a verbatim match within a {@link Hop}'s selector. This is nothing more than a string that will + * be used as-is when performing service name lookups. + * + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class VerbatimDirective implements HopDirective { + + private final String image; + + /** + * Constructs a new verbatim selector item for a given image. + * + * @param image The image to assign to this. + */ + public VerbatimDirective(String image) { + this.image = image; + } + + @Override + public boolean matches(HopDirective dir) { + + return dir instanceof VerbatimDirective && image.equals(((VerbatimDirective)dir).image); + } + + /** + * Returns the image to which this is a verbatim match. + * + * @return The image. + */ + public String getImage() { + return image; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof VerbatimDirective)) { + return false; + } + VerbatimDirective rhs = (VerbatimDirective)obj; + if (!image.equals(rhs.image)) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return image != null ? image.hashCode() : 0; + } + + @Override + public String toString() { + return image; + } + + @Override + public String toDebugString() { + return "VerbatimDirective(image = '" + image + "')"; + } +} + diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java index 60f20b55f8d..2fa167ac571 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/test/CustomPolicyFactory.java @@ -1,52 +1,52 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing.test;
-
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingPolicy;
-import com.yahoo.messagebus.test.SimpleProtocol;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class CustomPolicyFactory implements SimpleProtocol.PolicyFactory {
-
- private boolean selectOnRetry;
- private final List<Integer> consumableErrors = new ArrayList<Integer>();
-
- public CustomPolicyFactory() {
- this(true);
- }
-
- public CustomPolicyFactory(boolean selectOnRetry) {
- this(selectOnRetry, new ArrayList<Integer>());
- }
-
- public CustomPolicyFactory(boolean selectOnRetry, int consumableError) {
- this(selectOnRetry, Arrays.asList(consumableError));
- }
-
- public CustomPolicyFactory(boolean selectOnRetry, List<Integer> consumableErrors) {
- this.selectOnRetry = selectOnRetry;
- this.consumableErrors.addAll(consumableErrors);
- }
-
- public RoutingPolicy create(String param) {
- return new CustomPolicy(selectOnRetry, consumableErrors, parseRoutes(param));
- }
-
- public static List<Route> parseRoutes(String routes) {
- List<Route> ret = new ArrayList<Route>();
- if (routes != null && !routes.isEmpty()) {
- for (String route : routes.split(",")) {
- Route r = Route.parse(route);
- assert(route.equals(r.toString()));
- ret.add(r);
- }
- }
- return ret;
- }
-}
+package com.yahoo.messagebus.routing.test; + +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingPolicy; +import com.yahoo.messagebus.test.SimpleProtocol; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class CustomPolicyFactory implements SimpleProtocol.PolicyFactory { + + private boolean selectOnRetry; + private final List<Integer> consumableErrors = new ArrayList<Integer>(); + + public CustomPolicyFactory() { + this(true); + } + + public CustomPolicyFactory(boolean selectOnRetry) { + this(selectOnRetry, new ArrayList<Integer>()); + } + + public CustomPolicyFactory(boolean selectOnRetry, int consumableError) { + this(selectOnRetry, Arrays.asList(consumableError)); + } + + public CustomPolicyFactory(boolean selectOnRetry, List<Integer> consumableErrors) { + this.selectOnRetry = selectOnRetry; + this.consumableErrors.addAll(consumableErrors); + } + + public RoutingPolicy create(String param) { + return new CustomPolicy(selectOnRetry, consumableErrors, parseRoutes(param)); + } + + public static List<Route> parseRoutes(String routes) { + List<Route> ret = new ArrayList<Route>(); + if (routes != null && !routes.isEmpty()) { + for (String route : routes.split(",")) { + Route r = Route.parse(route); + assert(route.equals(r.toString())); + ret.add(r); + } + } + return ret; + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java index e67fc5030ed..34e48bafc19 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/ErrorTestCase.java @@ -1,92 +1,92 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import com.yahoo.messagebus.routing.RoutingTableSpec;
-import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ErrorTestCase {
-
- @Test
- public void requireThatAccessorsWork() {
- Error err = new Error(69, "foo");
- assertEquals(69, err.getCode());
- assertEquals("foo", err.getMessage());
-
- assertFalse(new Error(ErrorCode.TRANSIENT_ERROR, "foo").isFatal());
- assertFalse(new Error(ErrorCode.TRANSIENT_ERROR + 1, "foo").isFatal());
- assertTrue(new Error(ErrorCode.FATAL_ERROR, "foo").isFatal());
- assertTrue(new Error(ErrorCode.FATAL_ERROR + 1, "foo").isFatal());
- }
-
- @Test
- public void requireThatErrorIsPropagated() throws Exception {
- RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME);
- table.addHop("itr", "test/itr/session", Arrays.asList("test/itr/session"));
- table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session"));
- table.addRoute("test", Arrays.asList("itr", "dst"));
-
- Slobrok slobrok = new Slobrok();
- TestServer src = new TestServer("test/src", table, slobrok, null, null);
- TestServer itr = new TestServer("test/itr", table, slobrok, null, null);
- TestServer dst = new TestServer("test/dst", table, slobrok, null, null);
-
- Receptor ss_rr = new Receptor();
- SourceSession ss = src.mb.createSourceSession(ss_rr);
-
- Receptor is_mr = new Receptor();
- Receptor is_rr = new Receptor();
- IntermediateSession is = itr.mb.createIntermediateSession("session", true, is_mr, is_rr);
-
- Receptor ds_mr = new Receptor();
- DestinationSession ds = dst.mb.createDestinationSession("session", true, ds_mr);
-
- src.waitSlobrok("test/itr/session", 1);
- src.waitSlobrok("test/dst/session", 1);
- itr.waitSlobrok("test/dst/session", 1);
-
- for (int i = 0; i < 5; i++) {
- assertTrue(ss.send(new SimpleMessage("msg"), "test").isAccepted());
- Message msg = is_mr.getMessage(60);
- assertNotNull(msg);
- is.forward(msg);
-
- assertNotNull(msg = ds_mr.getMessage(60));
- Reply reply = new EmptyReply();
- msg.swapState(reply);
- reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
- ds.reply(reply);
-
- assertNotNull(reply = is_rr.getReply(60));
- assertEquals(reply.getNumErrors(), 1);
- assertEquals(reply.getError(0).getService(), "test/dst/session");
- reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality"));
- is.forward(reply);
-
- assertNotNull(reply = ss_rr.getReply(60));
- assertEquals(reply.getNumErrors(), 2);
- assertEquals(reply.getError(0).getService(), "test/dst/session");
- assertEquals(reply.getError(1).getService(), "test/itr/session");
- }
-
- ss.destroy();
- is.destroy();
- ds.destroy();
-
- dst.destroy();
- itr.destroy();
- src.destroy();
- slobrok.stop();
- }
-}
+package com.yahoo.messagebus; + +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.RoutingTableSpec; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ErrorTestCase { + + @Test + public void requireThatAccessorsWork() { + Error err = new Error(69, "foo"); + assertEquals(69, err.getCode()); + assertEquals("foo", err.getMessage()); + + assertFalse(new Error(ErrorCode.TRANSIENT_ERROR, "foo").isFatal()); + assertFalse(new Error(ErrorCode.TRANSIENT_ERROR + 1, "foo").isFatal()); + assertTrue(new Error(ErrorCode.FATAL_ERROR, "foo").isFatal()); + assertTrue(new Error(ErrorCode.FATAL_ERROR + 1, "foo").isFatal()); + } + + @Test + public void requireThatErrorIsPropagated() throws Exception { + RoutingTableSpec table = new RoutingTableSpec(SimpleProtocol.NAME); + table.addHop("itr", "test/itr/session", Arrays.asList("test/itr/session")); + table.addHop("dst", "test/dst/session", Arrays.asList("test/dst/session")); + table.addRoute("test", Arrays.asList("itr", "dst")); + + Slobrok slobrok = new Slobrok(); + TestServer src = new TestServer("test/src", table, slobrok, null, null); + TestServer itr = new TestServer("test/itr", table, slobrok, null, null); + TestServer dst = new TestServer("test/dst", table, slobrok, null, null); + + Receptor ss_rr = new Receptor(); + SourceSession ss = src.mb.createSourceSession(ss_rr); + + Receptor is_mr = new Receptor(); + Receptor is_rr = new Receptor(); + IntermediateSession is = itr.mb.createIntermediateSession("session", true, is_mr, is_rr); + + Receptor ds_mr = new Receptor(); + DestinationSession ds = dst.mb.createDestinationSession("session", true, ds_mr); + + src.waitSlobrok("test/itr/session", 1); + src.waitSlobrok("test/dst/session", 1); + itr.waitSlobrok("test/dst/session", 1); + + for (int i = 0; i < 5; i++) { + assertTrue(ss.send(new SimpleMessage("msg"), "test").isAccepted()); + Message msg = is_mr.getMessage(60); + assertNotNull(msg); + is.forward(msg); + + assertNotNull(msg = ds_mr.getMessage(60)); + Reply reply = new EmptyReply(); + msg.swapState(reply); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality")); + ds.reply(reply); + + assertNotNull(reply = is_rr.getReply(60)); + assertEquals(reply.getNumErrors(), 1); + assertEquals(reply.getError(0).getService(), "test/dst/session"); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR, "fatality")); + is.forward(reply); + + assertNotNull(reply = ss_rr.getReply(60)); + assertEquals(reply.getNumErrors(), 2); + assertEquals(reply.getError(0).getService(), "test/dst/session"); + assertEquals(reply.getError(1).getService(), "test/itr/session"); + } + + ss.destroy(); + is.destroy(); + ds.destroy(); + + dst.destroy(); + itr.destroy(); + src.destroy(); + slobrok.stop(); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java index 6eb7257328b..8026b93e6d9 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SendProxyTestCase.java @@ -1,169 +1,169 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-import com.yahoo.component.Vtag;
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import com.yahoo.messagebus.test.SimpleReply;
-import junit.framework.TestCase;
-
-import java.net.UnknownHostException;
-import java.util.logging.Handler;
-import java.util.logging.LogRecord;
-import java.util.logging.Logger;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SendProxyTestCase extends TestCase {
-
- Slobrok slobrok;
- TestServer srcServer, dstServer;
- SourceSession srcSession;
- DestinationSession dstSession;
-
- @Override
- public void setUp() throws UnknownHostException, ListenFailedException {
- slobrok = new Slobrok();
- dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
- new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
- srcServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
- new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- srcSession = srcServer.mb.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor()));
- assertTrue(srcServer.waitSlobrok("dst/session", 1));
- }
-
- @Override
- public void tearDown() {
- slobrok.stop();
- dstSession.destroy();
- dstServer.destroy();
- srcSession.destroy();
- srcServer.destroy();
- }
-
- public void testTraceByLogLevel() {
- Logger log = Logger.getLogger(SendProxy.class.getName());
- LogHandler logHandler = new LogHandler();
- log.addHandler(logHandler);
-
- log.setLevel(LogLevel.INFO);
- sendMessage(0, null);
- assertNull(logHandler.trace);
-
- log.setLevel(LogLevel.DEBUG);
- sendMessage(0, null);
- assertNull(logHandler.trace);
-
- sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
- assertNull(logHandler.trace);
-
- sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
- assertEquals("Trace for reply with error(s):\n" +
- "<trace>\n" +
- " <trace>\n" +
- " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
- " <trace>\n" +
- " Message (type 1) received at 'dst' for session 'session'.\n" +
- " [FATAL_ERROR @ localhost]: err\n" +
- " Sending reply (version ${VERSION}) from 'dst'.\n" +
- " </trace>\n" +
- " Reply (type 2) received at client.\n" +
- " </trace>\n" +
- "</trace>\n", logHandler.trace);
- logHandler.trace = null;
-
- log.setLevel(LogLevel.SPAM);
- sendMessage(1, null);
- assertNull(logHandler.trace);
-
- sendMessage(0, null);
- assertEquals("Trace for reply:\n" +
- "<trace>\n" +
- " <trace>\n" +
- " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
- " <trace>\n" +
- " Message (type 1) received at 'dst' for session 'session'.\n" +
- " Sending reply (version ${VERSION}) from 'dst'.\n" +
- " </trace>\n" +
- " Reply (type 0) received at client.\n" +
- " </trace>\n" +
- "</trace>\n", logHandler.trace);
- logHandler.trace = null;
-
- sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err"));
- assertNull(logHandler.trace);
-
- sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err"));
- assertEquals("Trace for reply with error(s):\n" +
- "<trace>\n" +
- " <trace>\n" +
- " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" +
- " <trace>\n" +
- " Message (type 1) received at 'dst' for session 'session'.\n" +
- " [FATAL_ERROR @ localhost]: err\n" +
- " Sending reply (version ${VERSION}) from 'dst'.\n" +
- " </trace>\n" +
- " Reply (type 2) received at client.\n" +
- " </trace>\n" +
- "</trace>\n", logHandler.trace);
- logHandler.trace = null;
- }
-
- private void sendMessage(int traceLevel, Error err) {
- Message msg = new SimpleMessage("foo");
- msg.getTrace().setLevel(traceLevel);
- assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted());
- assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60));
- if (err != null) {
- Reply reply = new SimpleReply("bar");
- reply.swapState(msg);
- reply.addError(err);
- dstSession.reply(reply);
- } else {
- dstSession.acknowledge(msg);
- }
- Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60);
- assertNotNull(reply);
- }
-
- private static class LogHandler extends Handler {
-
- String trace = null;
-
- @Override
- public void publish(LogRecord record) {
- String msg = record.getMessage();
- if (msg.startsWith("Trace ")) {
- msg = msg.replaceAll("\\[.*\\] ", "");
- msg = msg.replaceAll("[0-9]+\\.[0-9]+ seconds", "x seconds");
-
- String ver = Vtag.currentVersion.toString();
- for (int i = msg.indexOf(ver); i >= 0; i = msg.indexOf(ver, i)) {
- msg = msg.substring(0, i) + "${VERSION}" + msg.substring(i + ver.length());
- }
- trace = msg;
- }
- }
-
- @Override
- public void flush() {
- // empty
- }
-
- @Override
- public void close() throws SecurityException {
- // empty
- }
- }
-}
+package com.yahoo.messagebus; + +import com.yahoo.component.Vtag; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import com.yahoo.messagebus.test.SimpleReply; +import junit.framework.TestCase; + +import java.net.UnknownHostException; +import java.util.logging.Handler; +import java.util.logging.LogRecord; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SendProxyTestCase extends TestCase { + + Slobrok slobrok; + TestServer srcServer, dstServer; + SourceSession srcSession; + DestinationSession dstSession; + + @Override + public void setUp() throws UnknownHostException, ListenFailedException { + slobrok = new Slobrok(); + dstServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + dstSession = dstServer.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor())); + srcServer = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + srcSession = srcServer.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setThrottlePolicy(null).setReplyHandler(new Receptor())); + assertTrue(srcServer.waitSlobrok("dst/session", 1)); + } + + @Override + public void tearDown() { + slobrok.stop(); + dstSession.destroy(); + dstServer.destroy(); + srcSession.destroy(); + srcServer.destroy(); + } + + public void testTraceByLogLevel() { + Logger log = Logger.getLogger(SendProxy.class.getName()); + LogHandler logHandler = new LogHandler(); + log.addHandler(logHandler); + + log.setLevel(LogLevel.INFO); + sendMessage(0, null); + assertNull(logHandler.trace); + + log.setLevel(LogLevel.DEBUG); + sendMessage(0, null); + assertNull(logHandler.trace); + + sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err")); + assertNull(logHandler.trace); + + sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err")); + assertEquals("Trace for reply with error(s):\n" + + "<trace>\n" + + " <trace>\n" + + " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" + + " <trace>\n" + + " Message (type 1) received at 'dst' for session 'session'.\n" + + " [FATAL_ERROR @ localhost]: err\n" + + " Sending reply (version ${VERSION}) from 'dst'.\n" + + " </trace>\n" + + " Reply (type 2) received at client.\n" + + " </trace>\n" + + "</trace>\n", logHandler.trace); + logHandler.trace = null; + + log.setLevel(LogLevel.SPAM); + sendMessage(1, null); + assertNull(logHandler.trace); + + sendMessage(0, null); + assertEquals("Trace for reply:\n" + + "<trace>\n" + + " <trace>\n" + + " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" + + " <trace>\n" + + " Message (type 1) received at 'dst' for session 'session'.\n" + + " Sending reply (version ${VERSION}) from 'dst'.\n" + + " </trace>\n" + + " Reply (type 0) received at client.\n" + + " </trace>\n" + + "</trace>\n", logHandler.trace); + logHandler.trace = null; + + sendMessage(1, new Error(ErrorCode.FATAL_ERROR, "err")); + assertNull(logHandler.trace); + + sendMessage(0, new Error(ErrorCode.FATAL_ERROR, "err")); + assertEquals("Trace for reply with error(s):\n" + + "<trace>\n" + + " <trace>\n" + + " Sending message (version ${VERSION}) from client to 'dst/session' with x seconds timeout.\n" + + " <trace>\n" + + " Message (type 1) received at 'dst' for session 'session'.\n" + + " [FATAL_ERROR @ localhost]: err\n" + + " Sending reply (version ${VERSION}) from 'dst'.\n" + + " </trace>\n" + + " Reply (type 2) received at client.\n" + + " </trace>\n" + + "</trace>\n", logHandler.trace); + logHandler.trace = null; + } + + private void sendMessage(int traceLevel, Error err) { + Message msg = new SimpleMessage("foo"); + msg.getTrace().setLevel(traceLevel); + assertTrue(srcSession.send(msg, Route.parse("dst/session")).isAccepted()); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(60)); + if (err != null) { + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + reply.addError(err); + dstSession.reply(reply); + } else { + dstSession.acknowledge(msg); + } + Reply reply = ((Receptor)srcSession.getReplyHandler()).getReply(60); + assertNotNull(reply); + } + + private static class LogHandler extends Handler { + + String trace = null; + + @Override + public void publish(LogRecord record) { + String msg = record.getMessage(); + if (msg.startsWith("Trace ")) { + msg = msg.replaceAll("\\[.*\\] ", ""); + msg = msg.replaceAll("[0-9]+\\.[0-9]+ seconds", "x seconds"); + + String ver = Vtag.currentVersion.toString(); + for (int i = msg.indexOf(ver); i >= 0; i = msg.indexOf(ver, i)) { + msg = msg.substring(0, i) + "${VERSION}" + msg.substring(i + ver.length()); + } + trace = msg; + } + } + + @Override + public void flush() { + // empty + } + + @Override + public void close() throws SecurityException { + // empty + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java index 2795758d922..e0906a6feb4 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/SimpleTripTestCase.java @@ -1,53 +1,53 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import com.yahoo.messagebus.test.SimpleReply;
-
-import java.net.UnknownHostException;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SimpleTripTestCase extends junit.framework.TestCase {
-
- public void testSimpleTrip() throws ListenFailedException, UnknownHostException {
- Slobrok slobrok = new Slobrok();
- TestServer server = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()),
- new RPCNetworkParams()
- .setIdentity(new Identity("srv"))
- .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- DestinationSession dst = server.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
- SourceSession src = server.mb.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
- assertTrue(server.waitSlobrok("srv/session", 1));
-
- assertTrue(src.send(new SimpleMessage("msg"), Route.parse("srv/session")).isAccepted());
- Message msg = ((Receptor)dst.getMessageHandler()).getMessage(60);
- assertNotNull(msg);
- assertEquals(SimpleProtocol.NAME, msg.getProtocol());
- assertEquals(SimpleProtocol.MESSAGE, msg.getType());
- assertEquals("msg", ((SimpleMessage)msg).getValue());
-
- Reply reply = new SimpleReply("reply");
- reply.swapState(msg);
- dst.reply(reply);
-
- assertNotNull(reply = ((Receptor)src.getReplyHandler()).getReply(60));
- assertEquals(SimpleProtocol.NAME, reply.getProtocol());
- assertEquals(SimpleProtocol.REPLY, reply.getType());
- assertEquals("reply", ((SimpleReply)reply).getValue());
-
- src.destroy();
- dst.destroy();
- server.destroy();
- }
-}
+package com.yahoo.messagebus; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import com.yahoo.messagebus.test.SimpleReply; + +import java.net.UnknownHostException; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SimpleTripTestCase extends junit.framework.TestCase { + + public void testSimpleTrip() throws ListenFailedException, UnknownHostException { + Slobrok slobrok = new Slobrok(); + TestServer server = new TestServer(new MessageBusParams().addProtocol(new SimpleProtocol()), + new RPCNetworkParams() + .setIdentity(new Identity("srv")) + .setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + DestinationSession dst = server.mb.createDestinationSession(new DestinationSessionParams().setName("session").setMessageHandler(new Receptor())); + SourceSession src = server.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor())); + assertTrue(server.waitSlobrok("srv/session", 1)); + + assertTrue(src.send(new SimpleMessage("msg"), Route.parse("srv/session")).isAccepted()); + Message msg = ((Receptor)dst.getMessageHandler()).getMessage(60); + assertNotNull(msg); + assertEquals(SimpleProtocol.NAME, msg.getProtocol()); + assertEquals(SimpleProtocol.MESSAGE, msg.getType()); + assertEquals("msg", ((SimpleMessage)msg).getValue()); + + Reply reply = new SimpleReply("reply"); + reply.swapState(msg); + dst.reply(reply); + + assertNotNull(reply = ((Receptor)src.getReplyHandler()).getReply(60)); + assertEquals(SimpleProtocol.NAME, reply.getProtocol()); + assertEquals(SimpleProtocol.REPLY, reply.getType()); + assertEquals("reply", ((SimpleReply)reply).getValue()); + + src.destroy(); + dst.destroy(); + server.destroy(); + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java index d296d7c7058..a03066ec745 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/SendAdapterTestCase.java @@ -1,157 +1,157 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.component.Version;
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.*;
-import com.yahoo.messagebus.network.Identity;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.messagebus.test.SimpleMessage;
-import com.yahoo.messagebus.test.SimpleProtocol;
-import com.yahoo.messagebus.test.SimpleReply;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SendAdapterTestCase {
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Setup
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- Slobrok slobrok;
- TestServer srcServer, itrServer, dstServer;
- SourceSession srcSession;
- IntermediateSession itrSession;
- DestinationSession dstSession;
- TestProtocol srcProtocol, itrProtocol, dstProtocol;
-
- @Before
- public void setUp() throws ListenFailedException, UnknownHostException {
- slobrok = new Slobrok();
- dstServer = new TestServer(
- new MessageBusParams().addProtocol(dstProtocol = new TestProtocol()),
- new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- dstSession = dstServer.mb.createDestinationSession(
- new DestinationSessionParams().setName("session").setMessageHandler(new Receptor()));
- itrServer = new TestServer(
- new MessageBusParams().addProtocol(itrProtocol = new TestProtocol()),
- new RPCNetworkParams().setIdentity(new Identity("itr")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- itrSession = itrServer.mb.createIntermediateSession(
- new IntermediateSessionParams().setName("session").setMessageHandler(new Receptor()).setReplyHandler(new Receptor()));
- srcServer = new TestServer(
- new MessageBusParams().addProtocol(srcProtocol = new TestProtocol()),
- new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- srcSession = srcServer.mb.createSourceSession(
- new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor()));
- assertTrue(srcServer.waitSlobrok("*/session", 2));
- }
-
- @After
- public void tearDown() {
- slobrok.stop();
- dstSession.destroy();
- dstServer.destroy();
- itrSession.destroy();
- itrServer.destroy();
- srcSession.destroy();
- srcServer.destroy();
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Tests
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- @Test
- public void requireThatMessagesCanBeSentAcrossAllSupportedVersions() throws Exception {
- List<Version> versions = Arrays.asList(new Version(5, 0), new Version(5, 1));
- for (Version srcVersion : versions) {
- for (Version itrVersion : versions) {
- for (Version dstVersion : versions) {
- assertVersionedSend(srcVersion, itrVersion, dstVersion);
- }
- }
- }
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Utilities
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- private void assertVersionedSend(Version srcVersion, Version itrVersion, Version dstVersion) {
- System.out.println("Sending from " + srcVersion + " through " + itrVersion + " to " + dstVersion + ":");
- srcServer.net.setVersion(srcVersion);
- itrServer.net.setVersion(itrVersion);
- dstServer.net.setVersion(dstVersion);
-
- Message msg = new SimpleMessage("foo");
- msg.getTrace().setLevel(9);
- assertTrue(srcSession.send(msg, Route.parse("itr/session dst/session")).isAccepted());
- assertNotNull(msg = ((Receptor)itrSession.getMessageHandler()).getMessage(300));
- System.out.println("\tMessage version " + srcProtocol.lastVersion + " serialized at source.");
- Version minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
- assertEquals(minVersion, srcProtocol.lastVersion);
-
- System.out.println("\tMessage version " + itrProtocol.lastVersion + " reached intermediate.");
- assertEquals(minVersion, itrProtocol.lastVersion);
- itrSession.forward(msg);
- assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(300));
- System.out.println("\tMessage version " + itrProtocol.lastVersion + " serialized at intermediate.");
- minVersion = itrVersion.compareTo(dstVersion) < 0 ? itrVersion : dstVersion;
- assertEquals(minVersion, itrProtocol.lastVersion);
-
- System.out.println("\tMessage version " + dstProtocol.lastVersion + " reached destination.");
- assertEquals(minVersion, dstProtocol.lastVersion);
- Reply reply = new SimpleReply("bar");
- reply.swapState(msg);
- dstSession.reply(reply);
- assertNotNull(reply = ((Receptor)itrSession.getReplyHandler()).getReply(300));
- System.out.println("\tReply version " + dstProtocol.lastVersion + " serialized at destination.");
- assertEquals(minVersion, dstProtocol.lastVersion);
-
- System.out.println("\tReply version " + itrProtocol.lastVersion + " reached intermediate.");
- assertEquals(minVersion, itrProtocol.lastVersion);
- itrSession.forward(reply);
- assertNotNull(((Receptor)srcSession.getReplyHandler()).getReply(300));
- System.out.println("\tReply version " + itrProtocol.lastVersion + " serialized at intermediate.");
- minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion;
- assertEquals(minVersion, itrProtocol.lastVersion);
-
- System.out.println("\tReply version " + srcProtocol.lastVersion + " reached source.");
- assertEquals(minVersion, srcProtocol.lastVersion);
- }
-
- private static class TestProtocol extends SimpleProtocol {
-
- Version lastVersion;
-
- @Override
- public byte[] encode(Version version, Routable routable) {
- lastVersion = version;
- return super.encode(version, routable);
- }
-
- public Routable decode(Version version, byte[] payload) {
- lastVersion = version;
- return super.decode(version, payload);
- }
- }
-}
+package com.yahoo.messagebus.network.rpc; + +import com.yahoo.component.Version; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.*; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.test.Receptor; +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleProtocol; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class SendAdapterTestCase { + + //////////////////////////////////////////////////////////////////////////////// + // + // Setup + // + //////////////////////////////////////////////////////////////////////////////// + + Slobrok slobrok; + TestServer srcServer, itrServer, dstServer; + SourceSession srcSession; + IntermediateSession itrSession; + DestinationSession dstSession; + TestProtocol srcProtocol, itrProtocol, dstProtocol; + + @Before + public void setUp() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + dstServer = new TestServer( + new MessageBusParams().addProtocol(dstProtocol = new TestProtocol()), + new RPCNetworkParams().setIdentity(new Identity("dst")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + dstSession = dstServer.mb.createDestinationSession( + new DestinationSessionParams().setName("session").setMessageHandler(new Receptor())); + itrServer = new TestServer( + new MessageBusParams().addProtocol(itrProtocol = new TestProtocol()), + new RPCNetworkParams().setIdentity(new Identity("itr")).setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + itrSession = itrServer.mb.createIntermediateSession( + new IntermediateSessionParams().setName("session").setMessageHandler(new Receptor()).setReplyHandler(new Receptor())); + srcServer = new TestServer( + new MessageBusParams().addProtocol(srcProtocol = new TestProtocol()), + new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + srcSession = srcServer.mb.createSourceSession( + new SourceSessionParams().setTimeout(600.0).setReplyHandler(new Receptor())); + assertTrue(srcServer.waitSlobrok("*/session", 2)); + } + + @After + public void tearDown() { + slobrok.stop(); + dstSession.destroy(); + dstServer.destroy(); + itrSession.destroy(); + itrServer.destroy(); + srcSession.destroy(); + srcServer.destroy(); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Tests + // + //////////////////////////////////////////////////////////////////////////////// + + @Test + public void requireThatMessagesCanBeSentAcrossAllSupportedVersions() throws Exception { + List<Version> versions = Arrays.asList(new Version(5, 0), new Version(5, 1)); + for (Version srcVersion : versions) { + for (Version itrVersion : versions) { + for (Version dstVersion : versions) { + assertVersionedSend(srcVersion, itrVersion, dstVersion); + } + } + } + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Utilities + // + //////////////////////////////////////////////////////////////////////////////// + + private void assertVersionedSend(Version srcVersion, Version itrVersion, Version dstVersion) { + System.out.println("Sending from " + srcVersion + " through " + itrVersion + " to " + dstVersion + ":"); + srcServer.net.setVersion(srcVersion); + itrServer.net.setVersion(itrVersion); + dstServer.net.setVersion(dstVersion); + + Message msg = new SimpleMessage("foo"); + msg.getTrace().setLevel(9); + assertTrue(srcSession.send(msg, Route.parse("itr/session dst/session")).isAccepted()); + assertNotNull(msg = ((Receptor)itrSession.getMessageHandler()).getMessage(300)); + System.out.println("\tMessage version " + srcProtocol.lastVersion + " serialized at source."); + Version minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion; + assertEquals(minVersion, srcProtocol.lastVersion); + + System.out.println("\tMessage version " + itrProtocol.lastVersion + " reached intermediate."); + assertEquals(minVersion, itrProtocol.lastVersion); + itrSession.forward(msg); + assertNotNull(msg = ((Receptor)dstSession.getMessageHandler()).getMessage(300)); + System.out.println("\tMessage version " + itrProtocol.lastVersion + " serialized at intermediate."); + minVersion = itrVersion.compareTo(dstVersion) < 0 ? itrVersion : dstVersion; + assertEquals(minVersion, itrProtocol.lastVersion); + + System.out.println("\tMessage version " + dstProtocol.lastVersion + " reached destination."); + assertEquals(minVersion, dstProtocol.lastVersion); + Reply reply = new SimpleReply("bar"); + reply.swapState(msg); + dstSession.reply(reply); + assertNotNull(reply = ((Receptor)itrSession.getReplyHandler()).getReply(300)); + System.out.println("\tReply version " + dstProtocol.lastVersion + " serialized at destination."); + assertEquals(minVersion, dstProtocol.lastVersion); + + System.out.println("\tReply version " + itrProtocol.lastVersion + " reached intermediate."); + assertEquals(minVersion, itrProtocol.lastVersion); + itrSession.forward(reply); + assertNotNull(((Receptor)srcSession.getReplyHandler()).getReply(300)); + System.out.println("\tReply version " + itrProtocol.lastVersion + " serialized at intermediate."); + minVersion = srcVersion.compareTo(itrVersion) < 0 ? srcVersion : itrVersion; + assertEquals(minVersion, itrProtocol.lastVersion); + + System.out.println("\tReply version " + srcProtocol.lastVersion + " reached source."); + assertEquals(minVersion, srcProtocol.lastVersion); + } + + private static class TestProtocol extends SimpleProtocol { + + Version lastVersion; + + @Override + public byte[] encode(Version version, Routable routable) { + lastVersion = version; + return super.encode(version, routable); + } + + public Routable decode(Version version, byte[] payload) { + lastVersion = version; + return super.decode(version, payload); + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java index e69896e2bcf..dfc5b648eb5 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServiceAddressTestCase.java @@ -1,90 +1,90 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.Spec;
-import com.yahoo.jrt.slobrok.api.Mirror;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.network.Identity;
-
-import java.net.UnknownHostException;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ServiceAddressTestCase extends junit.framework.TestCase {
-
- private Slobrok slobrok;
- private RPCNetwork network;
-
- public ServiceAddressTestCase(String msg) {
- super(msg);
- }
-
- public void setUp() throws ListenFailedException, UnknownHostException {
- slobrok = new Slobrok();
- network = new RPCNetwork(new RPCNetworkParams()
- .setIdentity(new Identity("foo"))
- .setSlobrokConfigId("raw:slobrok[1]\nslobrok[0].connectionspec \"" +
- new Spec("localhost", slobrok.port()).toString() + "\"\n"));
- }
-
- public void tearDown() {
- network.shutdown();
- slobrok.stop();
- }
-
- public void testAddrServiceAddress() {
- assertNullAddress("tcp");
- assertNullAddress("tcp/");
- assertNullAddress("tcp/localhost");
- assertNullAddress("tcp/localhost:");
- assertNullAddress("tcp/localhost:1977");
- assertNullAddress("tcp/localhost:1977/");
- assertAddress("tcp/localhost:1977/session", "tcp/localhost:1977", "session");
- assertNullAddress("tcp/localhost:/session");
- //assertNullAddress("tcp/:1977/session");
- assertNullAddress("tcp/:/session");
- }
-
- public void testNameServiceAddress() {
- network.unregisterSession("session");
- assertTrue(waitSlobrok("foo/session", 0));
- assertNullAddress("foo/session");
-
- network.registerSession("session");
- assertTrue(waitSlobrok("foo/session", 1));
- assertAddress("foo/session", network.getConnectionSpec(), "session");
- }
-
- private boolean waitSlobrok(String pattern, int num) {
- for (int i = 0; i < 1000 && !Thread.currentThread().isInterrupted(); ++i) {
- Mirror.Entry[] res = network.getMirror().lookup(pattern);
- if (res.length == num) {
- return true;
- }
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException e) {
- // ignore
- }
- }
- return false;
- }
-
- private void assertNullAddress(String pattern) {
- assertNull(new RPCService(network.getMirror(), pattern).resolve());
- }
-
- private void assertAddress(String pattern, String expectedSpec, String expectedSession) {
- RPCService service = new RPCService(network.getMirror(), pattern);
- RPCServiceAddress obj = service.resolve();
- assertNotNull(obj);
- assertNotNull(obj.getConnectionSpec());
- assertEquals(expectedSpec, obj.getConnectionSpec().toString());
- if (expectedSession != null) {
- assertEquals(expectedSession, obj.getSessionName());
- }
- }
-}
+package com.yahoo.messagebus.network.rpc; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.Identity; + +import java.net.UnknownHostException; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ServiceAddressTestCase extends junit.framework.TestCase { + + private Slobrok slobrok; + private RPCNetwork network; + + public ServiceAddressTestCase(String msg) { + super(msg); + } + + public void setUp() throws ListenFailedException, UnknownHostException { + slobrok = new Slobrok(); + network = new RPCNetwork(new RPCNetworkParams() + .setIdentity(new Identity("foo")) + .setSlobrokConfigId("raw:slobrok[1]\nslobrok[0].connectionspec \"" + + new Spec("localhost", slobrok.port()).toString() + "\"\n")); + } + + public void tearDown() { + network.shutdown(); + slobrok.stop(); + } + + public void testAddrServiceAddress() { + assertNullAddress("tcp"); + assertNullAddress("tcp/"); + assertNullAddress("tcp/localhost"); + assertNullAddress("tcp/localhost:"); + assertNullAddress("tcp/localhost:1977"); + assertNullAddress("tcp/localhost:1977/"); + assertAddress("tcp/localhost:1977/session", "tcp/localhost:1977", "session"); + assertNullAddress("tcp/localhost:/session"); + //assertNullAddress("tcp/:1977/session"); + assertNullAddress("tcp/:/session"); + } + + public void testNameServiceAddress() { + network.unregisterSession("session"); + assertTrue(waitSlobrok("foo/session", 0)); + assertNullAddress("foo/session"); + + network.registerSession("session"); + assertTrue(waitSlobrok("foo/session", 1)); + assertAddress("foo/session", network.getConnectionSpec(), "session"); + } + + private boolean waitSlobrok(String pattern, int num) { + for (int i = 0; i < 1000 && !Thread.currentThread().isInterrupted(); ++i) { + Mirror.Entry[] res = network.getMirror().lookup(pattern); + if (res.length == num) { + return true; + } + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + // ignore + } + } + return false; + } + + private void assertNullAddress(String pattern) { + assertNull(new RPCService(network.getMirror(), pattern).resolve()); + } + + private void assertAddress(String pattern, String expectedSpec, String expectedSession) { + RPCService service = new RPCService(network.getMirror(), pattern); + RPCServiceAddress obj = service.resolve(); + assertNotNull(obj); + assertNotNull(obj.getConnectionSpec()); + assertEquals(expectedSpec, obj.getConnectionSpec().toString()); + if (expectedSession != null) { + assertEquals(expectedSession, obj.getSessionName()); + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java index 42580b963a5..a35f98183da 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/ServicePoolTestCase.java @@ -1,57 +1,57 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-import junit.framework.TestCase;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class ServicePoolTestCase extends TestCase {
-
- public void testMaxSize() throws ListenFailedException {
- Slobrok slobrok = new Slobrok();
- RPCNetwork net = new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok)));
- RPCServicePool pool = new RPCServicePool(net, 2);
-
- pool.resolve("foo");
- assertEquals(1, pool.getSize());
- assertTrue(pool.hasService("foo"));
- assertTrue(!pool.hasService("bar"));
- assertTrue(!pool.hasService("baz"));
-
- pool.resolve("foo");
- assertEquals(1, pool.getSize());
- assertTrue(pool.hasService("foo"));
- assertTrue(!pool.hasService("bar"));
- assertTrue(!pool.hasService("baz"));
-
- pool.resolve("bar");
- assertEquals(2, pool.getSize());
- assertTrue(pool.hasService("foo"));
- assertTrue(pool.hasService("bar"));
- assertTrue(!pool.hasService("baz"));
-
- pool.resolve("baz");
- assertEquals(2, pool.getSize());
- assertTrue(!pool.hasService("foo"));
- assertTrue(pool.hasService("bar"));
- assertTrue(pool.hasService("baz"));
-
- pool.resolve("bar");
- assertEquals(2, pool.getSize());
- assertTrue(!pool.hasService("foo"));
- assertTrue(pool.hasService("bar"));
- assertTrue(pool.hasService("baz"));
-
- pool.resolve("foo");
- assertEquals(2, pool.getSize());
- assertTrue(pool.hasService("foo"));
- assertTrue(pool.hasService("bar"));
- assertTrue(!pool.hasService("baz"));
-
- slobrok.stop();
- }
+package com.yahoo.messagebus.network.rpc; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.network.rpc.test.TestServer; +import junit.framework.TestCase; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class ServicePoolTestCase extends TestCase { + + public void testMaxSize() throws ListenFailedException { + Slobrok slobrok = new Slobrok(); + RPCNetwork net = new RPCNetwork(new RPCNetworkParams().setSlobrokConfigId(TestServer.getSlobrokConfig(slobrok))); + RPCServicePool pool = new RPCServicePool(net, 2); + + pool.resolve("foo"); + assertEquals(1, pool.getSize()); + assertTrue(pool.hasService("foo")); + assertTrue(!pool.hasService("bar")); + assertTrue(!pool.hasService("baz")); + + pool.resolve("foo"); + assertEquals(1, pool.getSize()); + assertTrue(pool.hasService("foo")); + assertTrue(!pool.hasService("bar")); + assertTrue(!pool.hasService("baz")); + + pool.resolve("bar"); + assertEquals(2, pool.getSize()); + assertTrue(pool.hasService("foo")); + assertTrue(pool.hasService("bar")); + assertTrue(!pool.hasService("baz")); + + pool.resolve("baz"); + assertEquals(2, pool.getSize()); + assertTrue(!pool.hasService("foo")); + assertTrue(pool.hasService("bar")); + assertTrue(pool.hasService("baz")); + + pool.resolve("bar"); + assertEquals(2, pool.getSize()); + assertTrue(!pool.hasService("foo")); + assertTrue(pool.hasService("bar")); + assertTrue(pool.hasService("baz")); + + pool.resolve("foo"); + assertEquals(2, pool.getSize()); + assertTrue(pool.hasService("foo")); + assertTrue(pool.hasService("bar")); + assertTrue(!pool.hasService("baz")); + + slobrok.stop(); + } }
\ No newline at end of file diff --git a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java index 8d58b4dd89f..821080524af 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/network/rpc/TargetPoolTestCase.java @@ -1,112 +1,112 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.Supervisor;
-import com.yahoo.jrt.Transport;
-import com.yahoo.jrt.slobrok.server.Slobrok;
-import com.yahoo.concurrent.Timer;
-import com.yahoo.messagebus.network.rpc.test.TestServer;
-
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class TargetPoolTestCase extends junit.framework.TestCase {
-
- private Slobrok slobrok;
- private List<TestServer> servers;
- private Supervisor orb;
-
- @Override
- public void setUp() throws ListenFailedException {
- slobrok = new Slobrok();
- servers = new ArrayList<>();
- orb = new Supervisor(new Transport());
- }
-
- @Override
- public void tearDown() {
- slobrok.stop();
- for (TestServer server : servers) {
- server.destroy();
- }
- orb.transport().shutdown().join();
- }
-
- public void testConnectionExpire() throws ListenFailedException, UnknownHostException {
- // Necessary setup to be able to resolve targets.
- RPCServiceAddress adr1 = registerServer();
- RPCServiceAddress adr2 = registerServer();
- RPCServiceAddress adr3 = registerServer();
-
- PoolTimer timer = new PoolTimer();
- RPCTargetPool pool = new RPCTargetPool(timer, 0.666);
-
- // Assert that all connections expire.
- RPCTarget target;
- assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
- assertEquals(3, pool.size());
- for (int i = 0; i < 10; ++i) {
- pool.flushTargets(false);
- assertEquals(3, pool.size());
- }
- timer.millis += 999;
- pool.flushTargets(false);
- assertEquals(0, pool.size());
-
- // Assert that only idle connections expire.
- assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
- assertEquals(3, pool.size());
- timer.millis += 444;
- pool.flushTargets(false);
- assertEquals(3, pool.size());
- assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef();
- assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
- assertEquals(3, pool.size());
- timer.millis += 444;
- pool.flushTargets(false);
- assertEquals(2, pool.size());
- assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef();
- timer.millis += 444;
- pool.flushTargets(false);
- assertEquals(1, pool.size());
- timer.millis += 444;
- pool.flushTargets(false);
- assertEquals(0, pool.size());
-
- // Assert that connections never expire while they are referenced.
- assertNotNull(target = pool.getTarget(orb, adr1));
- assertEquals(1, pool.size());
- for (int i = 0; i < 10; ++i) {
- timer.millis += 999;
- pool.flushTargets(false);
- assertEquals(1, pool.size());
- }
- target.subRef();
- timer.millis += 999;
- pool.flushTargets(false);
- assertEquals(0, pool.size());
- }
-
- private RPCServiceAddress registerServer() throws ListenFailedException, UnknownHostException {
- servers.add(new TestServer("srv" + servers.size(), null, slobrok, null, null));
- return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec());
- }
-
- private static class PoolTimer implements Timer {
- long millis = 0;
-
- @Override
- public long milliTime() {
- return millis;
- }
- }
+package com.yahoo.messagebus.network.rpc; + +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.concurrent.Timer; +import com.yahoo.messagebus.network.rpc.test.TestServer; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class TargetPoolTestCase extends junit.framework.TestCase { + + private Slobrok slobrok; + private List<TestServer> servers; + private Supervisor orb; + + @Override + public void setUp() throws ListenFailedException { + slobrok = new Slobrok(); + servers = new ArrayList<>(); + orb = new Supervisor(new Transport()); + } + + @Override + public void tearDown() { + slobrok.stop(); + for (TestServer server : servers) { + server.destroy(); + } + orb.transport().shutdown().join(); + } + + public void testConnectionExpire() throws ListenFailedException, UnknownHostException { + // Necessary setup to be able to resolve targets. + RPCServiceAddress adr1 = registerServer(); + RPCServiceAddress adr2 = registerServer(); + RPCServiceAddress adr3 = registerServer(); + + PoolTimer timer = new PoolTimer(); + RPCTargetPool pool = new RPCTargetPool(timer, 0.666); + + // Assert that all connections expire. + RPCTarget target; + assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef(); + assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef(); + assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef(); + assertEquals(3, pool.size()); + for (int i = 0; i < 10; ++i) { + pool.flushTargets(false); + assertEquals(3, pool.size()); + } + timer.millis += 999; + pool.flushTargets(false); + assertEquals(0, pool.size()); + + // Assert that only idle connections expire. + assertNotNull(target = pool.getTarget(orb, adr1)); target.subRef(); + assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef(); + assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef(); + assertEquals(3, pool.size()); + timer.millis += 444; + pool.flushTargets(false); + assertEquals(3, pool.size()); + assertNotNull(target = pool.getTarget(orb, adr2)); target.subRef(); + assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef(); + assertEquals(3, pool.size()); + timer.millis += 444; + pool.flushTargets(false); + assertEquals(2, pool.size()); + assertNotNull(target = pool.getTarget(orb, adr3)); target.subRef(); + timer.millis += 444; + pool.flushTargets(false); + assertEquals(1, pool.size()); + timer.millis += 444; + pool.flushTargets(false); + assertEquals(0, pool.size()); + + // Assert that connections never expire while they are referenced. + assertNotNull(target = pool.getTarget(orb, adr1)); + assertEquals(1, pool.size()); + for (int i = 0; i < 10; ++i) { + timer.millis += 999; + pool.flushTargets(false); + assertEquals(1, pool.size()); + } + target.subRef(); + timer.millis += 999; + pool.flushTargets(false); + assertEquals(0, pool.size()); + } + + private RPCServiceAddress registerServer() throws ListenFailedException, UnknownHostException { + servers.add(new TestServer("srv" + servers.size(), null, slobrok, null, null)); + return new RPCServiceAddress("foo/bar", servers.get(servers.size() - 1).mb.getConnectionSpec()); + } + + private static class PoolTimer implements Timer { + long millis = 0; + + @Override + public long milliTime() { + return millis; + } + } }
\ No newline at end of file diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java index ad13f3325c7..141681a277a 100644 --- a/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RetryPolicyTestCase.java @@ -1,32 +1,32 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-import com.yahoo.messagebus.ErrorCode;
-import junit.framework.TestCase;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RetryPolicyTestCase extends TestCase {
-
- public void testSimpleRetryPolicy() {
- RetryTransientErrorsPolicy policy = new RetryTransientErrorsPolicy();
- for (int i = 0; i < 5; ++i) {
- double delay = i / 3.0;
- policy.setBaseDelay(delay);
- for (int j = 0; j < 5; ++j) {
- assertEquals((int)(j * delay), (int)policy.getRetryDelay(j));
- }
- for (int j = ErrorCode.NONE; j < ErrorCode.ERROR_LIMIT; ++j) {
- policy.setEnabled(true);
- if (j < ErrorCode.FATAL_ERROR) {
- assertTrue(policy.canRetry(j));
- } else {
- assertFalse(policy.canRetry(j));
- }
- policy.setEnabled(false);
- assertFalse(policy.canRetry(j));
- }
- }
- }
-}
+package com.yahoo.messagebus.routing; + +import com.yahoo.messagebus.ErrorCode; +import junit.framework.TestCase; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RetryPolicyTestCase extends TestCase { + + public void testSimpleRetryPolicy() { + RetryTransientErrorsPolicy policy = new RetryTransientErrorsPolicy(); + for (int i = 0; i < 5; ++i) { + double delay = i / 3.0; + policy.setBaseDelay(delay); + for (int j = 0; j < 5; ++j) { + assertEquals((int)(j * delay), (int)policy.getRetryDelay(j)); + } + for (int j = ErrorCode.NONE; j < ErrorCode.ERROR_LIMIT; ++j) { + policy.setEnabled(true); + if (j < ErrorCode.FATAL_ERROR) { + assertTrue(policy.canRetry(j)); + } else { + assertFalse(policy.canRetry(j)); + } + policy.setEnabled(false); + assertFalse(policy.canRetry(j)); + } + } + } +} diff --git a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java index 3cbb9d86e44..ada8fcfce29 100755 --- a/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java +++ b/messagebus/src/test/java/com/yahoo/messagebus/routing/RoutingSpecTestCase.java @@ -1,336 +1,336 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.routing;
-
-import com.yahoo.messagebus.ConfigAgent;
-import com.yahoo.messagebus.ConfigHandler;
-import junit.framework.TestCase;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class RoutingSpecTestCase extends TestCase {
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Tests
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- public void testConfig() {
- assertConfig(new RoutingSpec());
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")));
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
- .addHop(new HopSpec("myhop1", "myselector1"))));
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
- .addHop(new HopSpec("myhop1", "myselector1"))
- .addRoute(new RouteSpec("myroute1").addHop("myhop1"))));
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
- .addHop(new HopSpec("myhop1", "myselector1"))
- .addHop(new HopSpec("myhop2", "myselector2"))
- .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
- .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
- .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2"))));
- assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1")
- .addHop(new HopSpec("myhop1", "myselector1"))
- .addHop(new HopSpec("myhop2", "myselector2"))
- .addRoute(new RouteSpec("myroute1").addHop("myhop1"))
- .addRoute(new RouteSpec("myroute2").addHop("myhop2"))
- .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2")))
- .addTable(new RoutingTableSpec("mytable2")));
- assertEquals("routingtable[2]\n" +
- "routingtable[0].protocol \"mytable1\"\n" +
- "routingtable[1].protocol \"mytable2\"\n" +
- "routingtable[1].hop[3]\n" +
- "routingtable[1].hop[0].name \"myhop1\"\n" +
- "routingtable[1].hop[0].selector \"myselector1\"\n" +
- "routingtable[1].hop[1].name \"myhop2\"\n" +
- "routingtable[1].hop[1].selector \"myselector2\"\n" +
- "routingtable[1].hop[1].ignoreresult true\n" +
- "routingtable[1].hop[2].name \"myhop1\"\n" +
- "routingtable[1].hop[2].selector \"myselector3\"\n" +
- "routingtable[1].hop[2].recipient[2]\n" +
- "routingtable[1].hop[2].recipient[0] \"myrecipient1\"\n" +
- "routingtable[1].hop[2].recipient[1] \"myrecipient2\"\n" +
- "routingtable[1].route[1]\n" +
- "routingtable[1].route[0].name \"myroute1\"\n" +
- "routingtable[1].route[0].hop[1]\n" +
- "routingtable[1].route[0].hop[0] \"myhop1\"\n",
- new RoutingSpec()
- .addTable(new RoutingTableSpec("mytable1"))
- .addTable(new RoutingTableSpec("mytable2")
- .addHop(new HopSpec("myhop1", "myselector1"))
- .addHop(new HopSpec("myhop2", "myselector2").setIgnoreResult(true))
- .addHop(new HopSpec("myhop1", "myselector3")
- .addRecipient("myrecipient1")
- .addRecipient("myrecipient2"))
- .addRoute(new RouteSpec("myroute1").addHop("myhop1"))).toString());
- }
-
- public void testApplicationSpec() {
- assertApplicationSpec(Arrays.asList("foo"),
- Arrays.asList("foo",
- "*"));
- assertApplicationSpec(Arrays.asList("foo/bar"),
- Arrays.asList("foo/bar",
- "foo/*",
- "*/bar",
- "*/*"));
- assertApplicationSpec(Arrays.asList("foo/0/baz",
- "foo/1/baz",
- "foo/2/baz"),
- Arrays.asList("foo/0/baz",
- "foo/1/baz",
- "foo/2/baz",
- "foo/0/*",
- "foo/1/*",
- "foo/2/*",
- "foo/*/baz",
- "*/0/baz",
- "*/1/baz",
- "*/2/baz",
- "foo/*/*",
- "*/0/*",
- "*/1/*",
- "*/2/*",
- "*/*/baz",
- "*/*/*"));
- }
-
- public void testVeriyfOk() {
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("hop1", "myservice1"))),
- new ApplicationSpec().addService("mytable", "myservice1"));
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("route1").addHop("myservice1"))),
- new ApplicationSpec().addService("mytable", "myservice1"));
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("hop1", "myservice1"))
- .addRoute(new RouteSpec("route1").addHop("hop1"))),
- new ApplicationSpec().addService("mytable", "myservice1"));
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("hop1", "route:route2"))
- .addHop(new HopSpec("hop2", "myservice1"))
- .addRoute(new RouteSpec("route1").addHop("hop1"))
- .addRoute(new RouteSpec("route2").addHop("hop2"))),
- new ApplicationSpec().addService("mytable", "myservice1"));
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("myhop1", "foo/[bar]/baz").addRecipient("foo/0/baz").addRecipient("foo/1/baz"))),
- new ApplicationSpec()
- .addService("mytable", "foo/0/baz")
- .addService("mytable", "foo/1/baz"));
- }
-
- public void testVerifyToggle() {
- assertVerifyOk(new RoutingSpec(false)
- .addTable(new RoutingTableSpec("mytable"))
- .addTable(new RoutingTableSpec("mytable")),
- new ApplicationSpec());
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable", false)
- .addHop(new HopSpec("foo", "bar"))
- .addHop(new HopSpec("foo", "baz"))),
- new ApplicationSpec());
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "", false))),
- new ApplicationSpec());
- assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo", false))),
- new ApplicationSpec());
- }
-
- public void testVerifyFail() {
- // Duplicate table.
- assertVerifyFail(new RoutingSpec()
- .addTable(new RoutingTableSpec("mytable"))
- .addTable(new RoutingTableSpec("mytable")),
- new ApplicationSpec(),
- Arrays.asList("Routing table 'mytable' is defined 2 times."));
-
- // Duplicate hop.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar"))
- .addHop(new HopSpec("foo", "baz"))),
- new ApplicationSpec()
- .addService("mytable", "bar")
- .addService("mytable", "baz"),
- Arrays.asList("Hop 'foo' in routing table 'mytable' is defined 2 times."));
-
- // Duplicate route.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo").addHop("bar"))
- .addRoute(new RouteSpec("foo").addHop("baz"))),
- new ApplicationSpec()
- .addService("mytable", "bar")
- .addService("mytable", "baz"),
- Arrays.asList("Route 'foo' in routing table 'mytable' is defined 2 times."));
-
- // Empty hop.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", ""))),
- new ApplicationSpec(),
- Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to parse empty string."));
-
- // Empty route.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo"))),
- new ApplicationSpec(),
- Arrays.asList("Route 'foo' in routing table 'mytable' has no hops."));
-
- // Hop error.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar/baz cox"))),
- new ApplicationSpec(),
- Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
-
- // Hop error in recipient.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "[bar]").addRecipient("bar/baz cox"))),
- new ApplicationSpec(),
- Arrays.asList("For recipient 'bar/baz cox' in hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
-
- // Hop error in route.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo").addHop("bar/baz cox"))),
- new ApplicationSpec(),
- Arrays.asList("For hop 1 in route 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'."));
-
- // Hop not found.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo").addHop("bar"))),
- new ApplicationSpec(),
- Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references 'bar' which is neither a service, a route nor another hop."));
-
- // Mismatched recipient.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar/[baz]/cox").addRecipient("cox/0/bar"))),
- new ApplicationSpec(),
- Arrays.asList("Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'foo' in routing table 'mytable'."));
-
- // Route not found.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "route:bar"))),
- new ApplicationSpec(),
- Arrays.asList("Hop 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
-
- // Route not found in route.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addRoute(new RouteSpec("foo").addHop("route:bar"))),
- new ApplicationSpec(),
- Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references route 'bar' which does not exist."));
-
- // Service not found.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar/baz"))),
- new ApplicationSpec(),
- Arrays.asList("Hop 'foo' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop."));
-
- // Unexpected recipient.
- assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("foo", "bar").addRecipient("baz"))),
- new ApplicationSpec()
- .addService("mytable", "bar")
- .addService("mytable", "baz"),
- Arrays.asList("Hop 'foo' in routing table 'mytable' has recipients but no policy directive."));
-
- // Multiple errors.
- assertVerifyFail(new RoutingSpec()
- .addTable(new RoutingTableSpec("mytable"))
- .addTable(new RoutingTableSpec("mytable")
- .addHop(new HopSpec("hop1", "bar"))
- .addHop(new HopSpec("hop1", "baz"))
- .addHop(new HopSpec("hop2", ""))
- .addHop(new HopSpec("hop3", "bar/baz cox"))
- .addHop(new HopSpec("hop4", "[bar]").addRecipient("bar/baz cox"))
- .addHop(new HopSpec("hop5", "bar/[baz]/cox").addRecipient("cox/0/bar"))
- .addHop(new HopSpec("hop6", "route:route69"))
- .addHop(new HopSpec("hop7", "bar/baz"))
- .addHop(new HopSpec("hop8", "bar").addRecipient("baz"))
- .addRoute(new RouteSpec("route1").addHop("bar"))
- .addRoute(new RouteSpec("route1").addHop("baz"))
- .addRoute(new RouteSpec("route2").addHop(""))
- .addRoute(new RouteSpec("route3").addHop("bar/baz cox"))
- .addRoute(new RouteSpec("route4").addHop("hop69"))
- .addRoute(new RouteSpec("route5").addHop("route:route69"))),
- new ApplicationSpec()
- .addService("mytable", "bar")
- .addService("mytable", "baz"),
- Arrays.asList("Routing table 'mytable' is defined 2 times.",
- "For hop 'hop2' in routing table 'mytable'; Failed to parse empty string.",
- "For hop 'hop3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
- "For hop 1 in route 'route2' in routing table 'mytable'; Failed to parse empty string.",
- "For hop 1 in route 'route3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
- "For recipient 'bar/baz cox' in hop 'hop4' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.",
- "Hop 'hop1' in routing table 'mytable' is defined 2 times.",
- "Hop 'hop6' in routing table 'mytable' references route 'route69' which does not exist.",
- "Hop 'hop7' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop.",
- "Hop 'hop8' in routing table 'mytable' has recipients but no policy directive.",
- "Hop 1 in route 'route4' in routing table 'mytable' references 'hop69' which is neither a service, a route nor another hop.",
- "Hop 1 in route 'route5' in routing table 'mytable' references route 'route69' which does not exist.",
- "Route 'route1' in routing table 'mytable' is defined 2 times.",
- "Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'hop5' in routing table 'mytable'."));
- }
-
- ////////////////////////////////////////////////////////////////////////////////
- //
- // Utilities
- //
- ////////////////////////////////////////////////////////////////////////////////
-
- private static void assertVerifyOk(RoutingSpec routing, ApplicationSpec app) {
- assertVerifyFail(routing, app, new ArrayList<String>());
- }
-
- private static void assertVerifyFail(RoutingSpec routing, ApplicationSpec app, List<String> expectedErrors) {
- List<String> errors = new ArrayList<>();
- routing.verify(app, errors);
-
- Collections.sort(errors);
- Collections.sort(expectedErrors);
- assertEquals(expectedErrors.toString(), errors.toString());
- }
-
- private static void assertConfig(RoutingSpec routing) {
- assertEquals(routing, routing);
- assertEquals(routing, new RoutingSpec(routing));
-
- ConfigStore store = new ConfigStore();
- ConfigAgent subscriber = new ConfigAgent("raw:" + routing.toString(), store);
- subscriber.subscribe();
- assertTrue(store.routing.equals(routing));
- }
-
- private static void assertApplicationSpec(List<String> services, List<String> patterns) {
- ApplicationSpec app = new ApplicationSpec();
- for (String pattern : patterns) {
- assertFalse(app.isService("foo", pattern));
- assertFalse(app.isService("bar", pattern));
- }
- for (String service : services) {
- app.addService("foo", service);
- }
- for (String pattern : patterns) {
- assertTrue(app.isService("foo", pattern));
- assertFalse(app.isService("bar", pattern));
- }
- for (String service : services) {
- app.addService("bar", service);
- }
- for (String pattern : patterns) {
- assertTrue(app.isService("foo", pattern));
- assertTrue(app.isService("bar", pattern));
- }
- }
-
- private static class ConfigStore implements ConfigHandler {
-
- RoutingSpec routing = null;
-
- public void setupRouting(RoutingSpec routing) {
- this.routing = routing;
- }
- }
-}
+package com.yahoo.messagebus.routing; + +import com.yahoo.messagebus.ConfigAgent; +import com.yahoo.messagebus.ConfigHandler; +import junit.framework.TestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class RoutingSpecTestCase extends TestCase { + + //////////////////////////////////////////////////////////////////////////////// + // + // Tests + // + //////////////////////////////////////////////////////////////////////////////// + + public void testConfig() { + assertConfig(new RoutingSpec()); + assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1"))); + assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1") + .addHop(new HopSpec("myhop1", "myselector1")))); + assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1") + .addHop(new HopSpec("myhop1", "myselector1")) + .addRoute(new RouteSpec("myroute1").addHop("myhop1")))); + assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1") + .addHop(new HopSpec("myhop1", "myselector1")) + .addHop(new HopSpec("myhop2", "myselector2")) + .addRoute(new RouteSpec("myroute1").addHop("myhop1")) + .addRoute(new RouteSpec("myroute2").addHop("myhop2")) + .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2")))); + assertConfig(new RoutingSpec().addTable(new RoutingTableSpec("mytable1") + .addHop(new HopSpec("myhop1", "myselector1")) + .addHop(new HopSpec("myhop2", "myselector2")) + .addRoute(new RouteSpec("myroute1").addHop("myhop1")) + .addRoute(new RouteSpec("myroute2").addHop("myhop2")) + .addRoute(new RouteSpec("myroute12").addHop("myhop1").addHop("myhop2"))) + .addTable(new RoutingTableSpec("mytable2"))); + assertEquals("routingtable[2]\n" + + "routingtable[0].protocol \"mytable1\"\n" + + "routingtable[1].protocol \"mytable2\"\n" + + "routingtable[1].hop[3]\n" + + "routingtable[1].hop[0].name \"myhop1\"\n" + + "routingtable[1].hop[0].selector \"myselector1\"\n" + + "routingtable[1].hop[1].name \"myhop2\"\n" + + "routingtable[1].hop[1].selector \"myselector2\"\n" + + "routingtable[1].hop[1].ignoreresult true\n" + + "routingtable[1].hop[2].name \"myhop1\"\n" + + "routingtable[1].hop[2].selector \"myselector3\"\n" + + "routingtable[1].hop[2].recipient[2]\n" + + "routingtable[1].hop[2].recipient[0] \"myrecipient1\"\n" + + "routingtable[1].hop[2].recipient[1] \"myrecipient2\"\n" + + "routingtable[1].route[1]\n" + + "routingtable[1].route[0].name \"myroute1\"\n" + + "routingtable[1].route[0].hop[1]\n" + + "routingtable[1].route[0].hop[0] \"myhop1\"\n", + new RoutingSpec() + .addTable(new RoutingTableSpec("mytable1")) + .addTable(new RoutingTableSpec("mytable2") + .addHop(new HopSpec("myhop1", "myselector1")) + .addHop(new HopSpec("myhop2", "myselector2").setIgnoreResult(true)) + .addHop(new HopSpec("myhop1", "myselector3") + .addRecipient("myrecipient1") + .addRecipient("myrecipient2")) + .addRoute(new RouteSpec("myroute1").addHop("myhop1"))).toString()); + } + + public void testApplicationSpec() { + assertApplicationSpec(Arrays.asList("foo"), + Arrays.asList("foo", + "*")); + assertApplicationSpec(Arrays.asList("foo/bar"), + Arrays.asList("foo/bar", + "foo/*", + "*/bar", + "*/*")); + assertApplicationSpec(Arrays.asList("foo/0/baz", + "foo/1/baz", + "foo/2/baz"), + Arrays.asList("foo/0/baz", + "foo/1/baz", + "foo/2/baz", + "foo/0/*", + "foo/1/*", + "foo/2/*", + "foo/*/baz", + "*/0/baz", + "*/1/baz", + "*/2/baz", + "foo/*/*", + "*/0/*", + "*/1/*", + "*/2/*", + "*/*/baz", + "*/*/*")); + } + + public void testVeriyfOk() { + assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("hop1", "myservice1"))), + new ApplicationSpec().addService("mytable", "myservice1")); + assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addRoute(new RouteSpec("route1").addHop("myservice1"))), + new ApplicationSpec().addService("mytable", "myservice1")); + assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("hop1", "myservice1")) + .addRoute(new RouteSpec("route1").addHop("hop1"))), + new ApplicationSpec().addService("mytable", "myservice1")); + assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("hop1", "route:route2")) + .addHop(new HopSpec("hop2", "myservice1")) + .addRoute(new RouteSpec("route1").addHop("hop1")) + .addRoute(new RouteSpec("route2").addHop("hop2"))), + new ApplicationSpec().addService("mytable", "myservice1")); + assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("myhop1", "foo/[bar]/baz").addRecipient("foo/0/baz").addRecipient("foo/1/baz"))), + new ApplicationSpec() + .addService("mytable", "foo/0/baz") + .addService("mytable", "foo/1/baz")); + } + + public void testVerifyToggle() { + assertVerifyOk(new RoutingSpec(false) + .addTable(new RoutingTableSpec("mytable")) + .addTable(new RoutingTableSpec("mytable")), + new ApplicationSpec()); + assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable", false) + .addHop(new HopSpec("foo", "bar")) + .addHop(new HopSpec("foo", "baz"))), + new ApplicationSpec()); + assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", "", false))), + new ApplicationSpec()); + assertVerifyOk(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addRoute(new RouteSpec("foo", false))), + new ApplicationSpec()); + } + + public void testVerifyFail() { + // Duplicate table. + assertVerifyFail(new RoutingSpec() + .addTable(new RoutingTableSpec("mytable")) + .addTable(new RoutingTableSpec("mytable")), + new ApplicationSpec(), + Arrays.asList("Routing table 'mytable' is defined 2 times.")); + + // Duplicate hop. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", "bar")) + .addHop(new HopSpec("foo", "baz"))), + new ApplicationSpec() + .addService("mytable", "bar") + .addService("mytable", "baz"), + Arrays.asList("Hop 'foo' in routing table 'mytable' is defined 2 times.")); + + // Duplicate route. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addRoute(new RouteSpec("foo").addHop("bar")) + .addRoute(new RouteSpec("foo").addHop("baz"))), + new ApplicationSpec() + .addService("mytable", "bar") + .addService("mytable", "baz"), + Arrays.asList("Route 'foo' in routing table 'mytable' is defined 2 times.")); + + // Empty hop. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", ""))), + new ApplicationSpec(), + Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to parse empty string.")); + + // Empty route. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addRoute(new RouteSpec("foo"))), + new ApplicationSpec(), + Arrays.asList("Route 'foo' in routing table 'mytable' has no hops.")); + + // Hop error. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", "bar/baz cox"))), + new ApplicationSpec(), + Arrays.asList("For hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.")); + + // Hop error in recipient. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", "[bar]").addRecipient("bar/baz cox"))), + new ApplicationSpec(), + Arrays.asList("For recipient 'bar/baz cox' in hop 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.")); + + // Hop error in route. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addRoute(new RouteSpec("foo").addHop("bar/baz cox"))), + new ApplicationSpec(), + Arrays.asList("For hop 1 in route 'foo' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.")); + + // Hop not found. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addRoute(new RouteSpec("foo").addHop("bar"))), + new ApplicationSpec(), + Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references 'bar' which is neither a service, a route nor another hop.")); + + // Mismatched recipient. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", "bar/[baz]/cox").addRecipient("cox/0/bar"))), + new ApplicationSpec(), + Arrays.asList("Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'foo' in routing table 'mytable'.")); + + // Route not found. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", "route:bar"))), + new ApplicationSpec(), + Arrays.asList("Hop 'foo' in routing table 'mytable' references route 'bar' which does not exist.")); + + // Route not found in route. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addRoute(new RouteSpec("foo").addHop("route:bar"))), + new ApplicationSpec(), + Arrays.asList("Hop 1 in route 'foo' in routing table 'mytable' references route 'bar' which does not exist.")); + + // Service not found. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", "bar/baz"))), + new ApplicationSpec(), + Arrays.asList("Hop 'foo' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop.")); + + // Unexpected recipient. + assertVerifyFail(new RoutingSpec().addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("foo", "bar").addRecipient("baz"))), + new ApplicationSpec() + .addService("mytable", "bar") + .addService("mytable", "baz"), + Arrays.asList("Hop 'foo' in routing table 'mytable' has recipients but no policy directive.")); + + // Multiple errors. + assertVerifyFail(new RoutingSpec() + .addTable(new RoutingTableSpec("mytable")) + .addTable(new RoutingTableSpec("mytable") + .addHop(new HopSpec("hop1", "bar")) + .addHop(new HopSpec("hop1", "baz")) + .addHop(new HopSpec("hop2", "")) + .addHop(new HopSpec("hop3", "bar/baz cox")) + .addHop(new HopSpec("hop4", "[bar]").addRecipient("bar/baz cox")) + .addHop(new HopSpec("hop5", "bar/[baz]/cox").addRecipient("cox/0/bar")) + .addHop(new HopSpec("hop6", "route:route69")) + .addHop(new HopSpec("hop7", "bar/baz")) + .addHop(new HopSpec("hop8", "bar").addRecipient("baz")) + .addRoute(new RouteSpec("route1").addHop("bar")) + .addRoute(new RouteSpec("route1").addHop("baz")) + .addRoute(new RouteSpec("route2").addHop("")) + .addRoute(new RouteSpec("route3").addHop("bar/baz cox")) + .addRoute(new RouteSpec("route4").addHop("hop69")) + .addRoute(new RouteSpec("route5").addHop("route:route69"))), + new ApplicationSpec() + .addService("mytable", "bar") + .addService("mytable", "baz"), + Arrays.asList("Routing table 'mytable' is defined 2 times.", + "For hop 'hop2' in routing table 'mytable'; Failed to parse empty string.", + "For hop 'hop3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.", + "For hop 1 in route 'route2' in routing table 'mytable'; Failed to parse empty string.", + "For hop 1 in route 'route3' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.", + "For recipient 'bar/baz cox' in hop 'hop4' in routing table 'mytable'; Failed to completely parse 'bar/baz cox'.", + "Hop 'hop1' in routing table 'mytable' is defined 2 times.", + "Hop 'hop6' in routing table 'mytable' references route 'route69' which does not exist.", + "Hop 'hop7' in routing table 'mytable' references 'bar/baz' which is neither a service, a route nor another hop.", + "Hop 'hop8' in routing table 'mytable' has recipients but no policy directive.", + "Hop 1 in route 'route4' in routing table 'mytable' references 'hop69' which is neither a service, a route nor another hop.", + "Hop 1 in route 'route5' in routing table 'mytable' references route 'route69' which does not exist.", + "Route 'route1' in routing table 'mytable' is defined 2 times.", + "Selector 'bar/[baz]/cox' does not match recipient 'cox/0/bar' in hop 'hop5' in routing table 'mytable'.")); + } + + //////////////////////////////////////////////////////////////////////////////// + // + // Utilities + // + //////////////////////////////////////////////////////////////////////////////// + + private static void assertVerifyOk(RoutingSpec routing, ApplicationSpec app) { + assertVerifyFail(routing, app, new ArrayList<String>()); + } + + private static void assertVerifyFail(RoutingSpec routing, ApplicationSpec app, List<String> expectedErrors) { + List<String> errors = new ArrayList<>(); + routing.verify(app, errors); + + Collections.sort(errors); + Collections.sort(expectedErrors); + assertEquals(expectedErrors.toString(), errors.toString()); + } + + private static void assertConfig(RoutingSpec routing) { + assertEquals(routing, routing); + assertEquals(routing, new RoutingSpec(routing)); + + ConfigStore store = new ConfigStore(); + ConfigAgent subscriber = new ConfigAgent("raw:" + routing.toString(), store); + subscriber.subscribe(); + assertTrue(store.routing.equals(routing)); + } + + private static void assertApplicationSpec(List<String> services, List<String> patterns) { + ApplicationSpec app = new ApplicationSpec(); + for (String pattern : patterns) { + assertFalse(app.isService("foo", pattern)); + assertFalse(app.isService("bar", pattern)); + } + for (String service : services) { + app.addService("foo", service); + } + for (String pattern : patterns) { + assertTrue(app.isService("foo", pattern)); + assertFalse(app.isService("bar", pattern)); + } + for (String service : services) { + app.addService("bar", service); + } + for (String pattern : patterns) { + assertTrue(app.isService("foo", pattern)); + assertTrue(app.isService("bar", pattern)); + } + } + + private static class ConfigStore implements ConfigHandler { + + RoutingSpec routing = null; + + public void setupRouting(RoutingSpec routing) { + this.routing = routing; + } + } +} |