diff options
author | Jon Bratseth <bratseth@verizonmedia.com> | 2019-01-25 14:37:22 +0100 |
---|---|---|
committer | Jon Bratseth <bratseth@verizonmedia.com> | 2019-01-25 14:37:22 +0100 |
commit | c6762531a7b63dcd5cac48dabb80fc268e9fc934 (patch) | |
tree | ea4ef1aab40717395b0c5b83d2a6e00dd40413ef /messagebus | |
parent | 8a864f26c96240f829f1a65d9105e3c8734f2d92 (diff) |
Nonfunctional changes only
Diffstat (limited to 'messagebus')
7 files changed, 71 insertions, 68 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java index 8cf53e47717..a7753a7401e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java @@ -3,10 +3,11 @@ package com.yahoo.messagebus; /** * This is an implementation of the {@link ThrottlePolicy} that passes all requests (no real throttling). + * * @author dybis */ -public class AllPassThrottlePolicy implements ThrottlePolicy -{ +public class AllPassThrottlePolicy implements ThrottlePolicy { + @Override public boolean canSend(Message msg, int pendingCount) { return true; @@ -19,4 +20,5 @@ public class AllPassThrottlePolicy implements ThrottlePolicy @Override public void processReply(Reply reply) { } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 41a192a24be..dae20543b34 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -7,7 +7,7 @@ import com.yahoo.log.LogLevel; import java.util.logging.Logger; /** - * This is an implementatin of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a + * This is an implementation of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a * {@link SourceSession} is allowed to have. * * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet. @@ -44,7 +44,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { /** * Constructs a new instance of this class using the given clock to calculate efficiency. * - * @param timer The timer to use. + * @param timer the timer to use */ public DynamicThrottlePolicy(Timer timer) { this.timer = timer; @@ -64,8 +64,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } @Override - public boolean canSend(Message msg, int pendingCount) { - if (!super.canSend(msg, pendingCount)) { + public boolean canSend(Message message, int pendingCount) { + if ( ! super.canSend(message, pendingCount)) { return false; } long time = timer.milliTime(); @@ -78,8 +78,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } @Override - public void processMessage(Message msg) { - super.processMessage(msg); + public void processMessage(Message message) { + super.processMessage(message); if (++numSent < windowSize * resizeRate) { return; } @@ -126,7 +126,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { @Override public void processReply(Reply reply) { super.processReply(reply); - if (!reply.hasErrors()) { + if ( ! reply.hasErrors()) { ++numOk; } } @@ -136,8 +136,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { * the correlation between throughput and window size. The algorithm will increase the window size until efficiency * drops below the efficiency of the local maxima times this value. * - * @param efficiencyThreshold The limit to set. - * @return This, to allow chaining. + * @param efficiencyThreshold the limit to set + * @return this, to allow chaining * @see #setWindowSizeBackOff(double) */ public DynamicThrottlePolicy setEfficiencyThreshold(double efficiencyThreshold) { @@ -148,8 +148,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { /** * Sets the step size used when increasing window size. * - * @param windowSizeIncrement The step size to set. - * @return This, to allow chaining. + * @param windowSizeIncrement the step size to set + * @return this, to allow chaining */ public DynamicThrottlePolicy setWindowSizeIncrement(double windowSizeIncrement) { this.windowSizeIncrement = windowSizeIncrement; @@ -161,8 +161,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range. * - * @param windowSizeBackOff The back off to set. - * @return This, to allow chaining. + * @param windowSizeBackOff the back off to set + * @return this, to allow chaining */ public DynamicThrottlePolicy setWindowSizeBackOff(double windowSizeBackOff) { this.windowSizeBackOff = Math.max(0, Math.min(1, windowSizeBackOff)); @@ -173,8 +173,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { * Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing * becomes. However, the smaller the value, the less accurate the measurements become. * - * @param resizeRate The rate to set. - * @return This, to allow chaining. + * @param resizeRate the rate to set + * @return this, to allow chaining */ public DynamicThrottlePolicy setResizeRate(double resizeRate) { this.resizeRate = resizeRate; @@ -186,8 +186,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { * will be allocated to this clients. Resources are shared between clients * proportiannally to their weights. * - * @param weight The weight to set. - * @return This, to allow chaining. + * @param weight the weight to set + * @return this, to allow chaining */ public DynamicThrottlePolicy setWeight(double weight) { this.weight = weight; @@ -198,8 +198,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { * Sets the maximium number of pending operations allowed at any time, in * order to avoid using too much resources. * - * @param max The max to set. - * @return This, to allow chaining. + * @param max the max to set + * @return this, to allow chaining */ public DynamicThrottlePolicy setMaxWindowSize(double max) { this.maxWindowSize = max; @@ -209,7 +209,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { /** * Get the maximum number of pending operations allowed at any time. * - * @return The maximum number of operations. + * @return the maximum number of operations */ public double getMaxWindowSize() { return maxWindowSize; @@ -220,8 +220,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { * Sets the minimium number of pending operations allowed at any time, in * order to keep a level of performance. * - * @param min The min to set. - * @return This, to allow chaining. + * @param min the min to set + * @return this, to allow chaining */ public DynamicThrottlePolicy setMinWindowSize(double min) { this.minWindowSize = min; @@ -231,7 +231,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { /** * Get the minimum number of pending operations allowed at any time. * - * @return The minimum number of operations. + * @return the minimum number of operations */ public double getMinWindowSize() { return minWindowSize; @@ -247,7 +247,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { /** * Returns the maximum number of pending messages allowed. * - * @return The max limit. + * @return the max limit */ public int getMaxPendingCount() { return (int)windowSize; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java index 6b073b737db..029c6a8074c 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java @@ -35,8 +35,8 @@ public class RateThrottlingPolicy extends StaticThrottlePolicy { currentPeriod = timer.milliTime() / PERIOD; } - public boolean canSend(Message msg, int pendingCount) { - if (!super.canSend(msg, pendingCount)) { + public boolean canSend(Message message, int pendingCount) { + if (!super.canSend(message, pendingCount)) { return false; } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java index 4fdfa341e3b..2a4a123a60b 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java @@ -112,11 +112,11 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked * } while (!result.isAccepted()); * </code> * - * @param msg the message to send + * @param message the message to send * @return the result of <i>initiating</i> sending of this message */ - public Result send(Message msg) { - return sendInternal(updateTiming(msg)); + public Result send(Message message) { + return sendInternal(updateTiming(message)); } private Message updateTiming(Message msg) { @@ -127,29 +127,29 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked return msg; } - private Result sendInternal(Message msg) { + private Result sendInternal(Message message) { synchronized (lock) { if (closed) { return new Result(ErrorCode.SEND_QUEUE_CLOSED, "Source session is closed."); } - if (throttlePolicy != null && ! throttlePolicy.canSend(msg, pendingCount)) { + if (throttlePolicy != null && ! throttlePolicy.canSend(message, pendingCount)) { return new Result(ErrorCode.SEND_QUEUE_FULL, "Too much pending data (" + pendingCount + " messages)."); } - msg.pushHandler(replyHandler); + message.pushHandler(replyHandler); if (throttlePolicy != null) { - throttlePolicy.processMessage(msg); + throttlePolicy.processMessage(message); } ++pendingCount; } - if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) { - msg.getTrace().trace(TraceLevel.COMPONENT, - "Source session accepted a " + msg.getApproxSize() + " byte message. " + + if (message.getTrace().shouldTrace(TraceLevel.COMPONENT)) { + message.getTrace().trace(TraceLevel.COMPONENT, + "Source session accepted a " + message.getApproxSize() + " byte message. " + pendingCount + " message(s) now pending."); } - msg.pushHandler(this); - sequencer.handleMessage(msg); + message.pushHandler(this); + sequencer.handleMessage(message); return Result.ACCEPTED; } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java index e671a36fa26..e9796e6c4d2 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java @@ -24,7 +24,7 @@ public class SourceSessionParams { /** * Implements the copy constructor. * - * @param params The object to copy. + * @param params the object to copy */ public SourceSessionParams(SourceSessionParams params) { throttlePolicy = params.throttlePolicy; @@ -32,11 +32,7 @@ public class SourceSessionParams { replyHandler = params.replyHandler; } - /** - * Returns the policy to use for throttling output. - * - * @return The policy. - */ + /** Returns the policy to use for throttling output. */ public ThrottlePolicy getThrottlePolicy() { return throttlePolicy; } @@ -45,7 +41,7 @@ public class SourceSessionParams { * Sets the policy to use for throttling output. * * @param throttlePolicy The policy to set. - * @return This, to allow chaining. + * @return this, to allow chaining */ public SourceSessionParams setThrottlePolicy(ThrottlePolicy throttlePolicy) { this.throttlePolicy = throttlePolicy; @@ -55,7 +51,7 @@ public class SourceSessionParams { /** * Returns the number of seconds a message can spend trying to succeed. * - * @return The timeout in seconds. + * @return the timeout in seconds */ public double getTimeout() { return timeout; @@ -66,7 +62,7 @@ public class SourceSessionParams { * for any message bus operation. * * @param timeout The numer of seconds allowed. - * @return This, to allow chaining. + * @return this, to allow chaining */ public SourceSessionParams setTimeout(double timeout) { this.timeout = timeout; @@ -76,7 +72,7 @@ public class SourceSessionParams { /** * Returns whether or not a reply handler has been assigned to this. * - * @return True if a handler is set. + * @return true if a handler is set */ boolean hasReplyHandler() { return replyHandler != null; @@ -85,7 +81,7 @@ public class SourceSessionParams { /** * Returns the handler to receive incoming replies. * - * @return The handler. + * @return the handler */ public ReplyHandler getReplyHandler() { return replyHandler; @@ -95,10 +91,11 @@ public class SourceSessionParams { * Sets the handler to recive incoming replies. * * @param handler The handler to set. - * @return This, to allow chaining. + * @return this, to allow chaining */ public SourceSessionParams setReplyHandler(ReplyHandler handler) { replyHandler = handler; return this; } + } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java index 7d73cd4bf3f..aa1bc1ce624 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java @@ -2,7 +2,7 @@ package com.yahoo.messagebus; /** - * This is an implementatin of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a + * This is an implementation of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a * {@link SourceSession} is allowed to have. You may choose to set a limit to the total number of pending messages (by * way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link * #setMaxPendingSize(long)}), or some combination thereof. @@ -17,7 +17,8 @@ public class StaticThrottlePolicy implements ThrottlePolicy { private long maxPendingSize = 0; private long pendingSize = 0; - public boolean canSend(Message msg, int pendingCount) { + @Override + public boolean canSend(Message message, int pendingCount) { if (maxPendingCount > 0 && pendingCount >= maxPendingCount) { return false; } @@ -27,12 +28,14 @@ public class StaticThrottlePolicy implements ThrottlePolicy { return true; } - public void processMessage(Message msg) { - int size = msg.getApproxSize(); - msg.setContext(size); + @Override + public void processMessage(Message message) { + int size = message.getApproxSize(); + message.setContext(size); pendingSize += size; } + @Override public void processReply(Reply reply) { int size = (Integer)reply.getContext(); pendingSize -= size; @@ -41,7 +44,7 @@ public class StaticThrottlePolicy implements ThrottlePolicy { /** * Returns the maximum number of pending messages allowed. * - * @return The max limit. + * @return the max limit */ public int getMaxPendingCount() { return maxPendingCount; @@ -50,8 +53,8 @@ public class StaticThrottlePolicy implements ThrottlePolicy { /** * Sets the maximum number of pending messages allowed. * - * @param maxCount The max count. - * @return This, to allow chaining. + * @param maxCount The max count + * @return this, to allow chaining */ public StaticThrottlePolicy setMaxPendingCount(int maxCount) { maxPendingCount = maxCount; diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java index 30a0b82f2cd..1c7595f2b5c 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java @@ -14,23 +14,24 @@ public interface ThrottlePolicy { /** * Returns whether or not the given message can be sent according to the current state of this policy. * - * @param msg The message to evaluate. - * @param pendingCount The current number of pending messages. - * @return True to send the message. + * @param message the message to evaluate + * @param pendingCount the current number of pending messages + * @return true to send the message */ - public boolean canSend(Message msg, int pendingCount); + boolean canSend(Message message, int pendingCount); /** * This method is called once for every message that was accepted by {@link #canSend(Message, int)} and sent. * - * @param msg The message beint sent. + * @param message the message being sent */ - public void processMessage(Message msg); + void processMessage(Message message); /** * This method is called once for every reply that is received. * - * @param reply The reply received. + * @param reply the reply received */ - public void processReply(Reply reply); + void processReply(Reply reply); + } |