summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@verizonmedia.com>2019-01-25 14:37:22 +0100
committerJon Bratseth <bratseth@verizonmedia.com>2019-01-25 14:37:22 +0100
commitc6762531a7b63dcd5cac48dabb80fc268e9fc934 (patch)
treeea4ef1aab40717395b0c5b83d2a6e00dd40413ef /messagebus
parent8a864f26c96240f829f1a65d9105e3c8734f2d92 (diff)
Nonfunctional changes only
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java6
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java48
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java4
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java24
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java21
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java19
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java17
7 files changed, 71 insertions, 68 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java
index 8cf53e47717..a7753a7401e 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/AllPassThrottlePolicy.java
@@ -3,10 +3,11 @@ package com.yahoo.messagebus;
/**
* This is an implementation of the {@link ThrottlePolicy} that passes all requests (no real throttling).
+ *
* @author dybis
*/
-public class AllPassThrottlePolicy implements ThrottlePolicy
-{
+public class AllPassThrottlePolicy implements ThrottlePolicy {
+
@Override
public boolean canSend(Message msg, int pendingCount) {
return true;
@@ -19,4 +20,5 @@ public class AllPassThrottlePolicy implements ThrottlePolicy
@Override
public void processReply(Reply reply) {
}
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
index 41a192a24be..dae20543b34 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
@@ -7,7 +7,7 @@ import com.yahoo.log.LogLevel;
import java.util.logging.Logger;
/**
- * This is an implementatin of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a
+ * This is an implementation of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a
* {@link SourceSession} is allowed to have.
*
* <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
@@ -44,7 +44,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Constructs a new instance of this class using the given clock to calculate efficiency.
*
- * @param timer The timer to use.
+ * @param timer the timer to use
*/
public DynamicThrottlePolicy(Timer timer) {
this.timer = timer;
@@ -64,8 +64,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
@Override
- public boolean canSend(Message msg, int pendingCount) {
- if (!super.canSend(msg, pendingCount)) {
+ public boolean canSend(Message message, int pendingCount) {
+ if ( ! super.canSend(message, pendingCount)) {
return false;
}
long time = timer.milliTime();
@@ -78,8 +78,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
@Override
- public void processMessage(Message msg) {
- super.processMessage(msg);
+ public void processMessage(Message message) {
+ super.processMessage(message);
if (++numSent < windowSize * resizeRate) {
return;
}
@@ -126,7 +126,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
@Override
public void processReply(Reply reply) {
super.processReply(reply);
- if (!reply.hasErrors()) {
+ if ( ! reply.hasErrors()) {
++numOk;
}
}
@@ -136,8 +136,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* the correlation between throughput and window size. The algorithm will increase the window size until efficiency
* drops below the efficiency of the local maxima times this value.
*
- * @param efficiencyThreshold The limit to set.
- * @return This, to allow chaining.
+ * @param efficiencyThreshold the limit to set
+ * @return this, to allow chaining
* @see #setWindowSizeBackOff(double)
*/
public DynamicThrottlePolicy setEfficiencyThreshold(double efficiencyThreshold) {
@@ -148,8 +148,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Sets the step size used when increasing window size.
*
- * @param windowSizeIncrement The step size to set.
- * @return This, to allow chaining.
+ * @param windowSizeIncrement the step size to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setWindowSizeIncrement(double windowSizeIncrement) {
this.windowSizeIncrement = windowSizeIncrement;
@@ -161,8 +161,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to
* reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range.
*
- * @param windowSizeBackOff The back off to set.
- * @return This, to allow chaining.
+ * @param windowSizeBackOff the back off to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setWindowSizeBackOff(double windowSizeBackOff) {
this.windowSizeBackOff = Math.max(0, Math.min(1, windowSizeBackOff));
@@ -173,8 +173,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing
* becomes. However, the smaller the value, the less accurate the measurements become.
*
- * @param resizeRate The rate to set.
- * @return This, to allow chaining.
+ * @param resizeRate the rate to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setResizeRate(double resizeRate) {
this.resizeRate = resizeRate;
@@ -186,8 +186,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* will be allocated to this clients. Resources are shared between clients
* proportiannally to their weights.
*
- * @param weight The weight to set.
- * @return This, to allow chaining.
+ * @param weight the weight to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setWeight(double weight) {
this.weight = weight;
@@ -198,8 +198,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* Sets the maximium number of pending operations allowed at any time, in
* order to avoid using too much resources.
*
- * @param max The max to set.
- * @return This, to allow chaining.
+ * @param max the max to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setMaxWindowSize(double max) {
this.maxWindowSize = max;
@@ -209,7 +209,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Get the maximum number of pending operations allowed at any time.
*
- * @return The maximum number of operations.
+ * @return the maximum number of operations
*/
public double getMaxWindowSize() {
return maxWindowSize;
@@ -220,8 +220,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
* Sets the minimium number of pending operations allowed at any time, in
* order to keep a level of performance.
*
- * @param min The min to set.
- * @return This, to allow chaining.
+ * @param min the min to set
+ * @return this, to allow chaining
*/
public DynamicThrottlePolicy setMinWindowSize(double min) {
this.minWindowSize = min;
@@ -231,7 +231,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Get the minimum number of pending operations allowed at any time.
*
- * @return The minimum number of operations.
+ * @return the minimum number of operations
*/
public double getMinWindowSize() {
return minWindowSize;
@@ -247,7 +247,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Returns the maximum number of pending messages allowed.
*
- * @return The max limit.
+ * @return the max limit
*/
public int getMaxPendingCount() {
return (int)windowSize;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java
index 6b073b737db..029c6a8074c 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/RateThrottlingPolicy.java
@@ -35,8 +35,8 @@ public class RateThrottlingPolicy extends StaticThrottlePolicy {
currentPeriod = timer.milliTime() / PERIOD;
}
- public boolean canSend(Message msg, int pendingCount) {
- if (!super.canSend(msg, pendingCount)) {
+ public boolean canSend(Message message, int pendingCount) {
+ if (!super.canSend(message, pendingCount)) {
return false;
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
index 4fdfa341e3b..2a4a123a60b 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSession.java
@@ -112,11 +112,11 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
* } while (!result.isAccepted());
* </code>
*
- * @param msg the message to send
+ * @param message the message to send
* @return the result of <i>initiating</i> sending of this message
*/
- public Result send(Message msg) {
- return sendInternal(updateTiming(msg));
+ public Result send(Message message) {
+ return sendInternal(updateTiming(message));
}
private Message updateTiming(Message msg) {
@@ -127,29 +127,29 @@ public final class SourceSession implements ReplyHandler, MessageBus.SendBlocked
return msg;
}
- private Result sendInternal(Message msg) {
+ private Result sendInternal(Message message) {
synchronized (lock) {
if (closed) {
return new Result(ErrorCode.SEND_QUEUE_CLOSED,
"Source session is closed.");
}
- if (throttlePolicy != null && ! throttlePolicy.canSend(msg, pendingCount)) {
+ if (throttlePolicy != null && ! throttlePolicy.canSend(message, pendingCount)) {
return new Result(ErrorCode.SEND_QUEUE_FULL,
"Too much pending data (" + pendingCount + " messages).");
}
- msg.pushHandler(replyHandler);
+ message.pushHandler(replyHandler);
if (throttlePolicy != null) {
- throttlePolicy.processMessage(msg);
+ throttlePolicy.processMessage(message);
}
++pendingCount;
}
- if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
- msg.getTrace().trace(TraceLevel.COMPONENT,
- "Source session accepted a " + msg.getApproxSize() + " byte message. " +
+ if (message.getTrace().shouldTrace(TraceLevel.COMPONENT)) {
+ message.getTrace().trace(TraceLevel.COMPONENT,
+ "Source session accepted a " + message.getApproxSize() + " byte message. " +
pendingCount + " message(s) now pending.");
}
- msg.pushHandler(this);
- sequencer.handleMessage(msg);
+ message.pushHandler(this);
+ sequencer.handleMessage(message);
return Result.ACCEPTED;
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java b/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java
index e671a36fa26..e9796e6c4d2 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/SourceSessionParams.java
@@ -24,7 +24,7 @@ public class SourceSessionParams {
/**
* Implements the copy constructor.
*
- * @param params The object to copy.
+ * @param params the object to copy
*/
public SourceSessionParams(SourceSessionParams params) {
throttlePolicy = params.throttlePolicy;
@@ -32,11 +32,7 @@ public class SourceSessionParams {
replyHandler = params.replyHandler;
}
- /**
- * Returns the policy to use for throttling output.
- *
- * @return The policy.
- */
+ /** Returns the policy to use for throttling output. */
public ThrottlePolicy getThrottlePolicy() {
return throttlePolicy;
}
@@ -45,7 +41,7 @@ public class SourceSessionParams {
* Sets the policy to use for throttling output.
*
* @param throttlePolicy The policy to set.
- * @return This, to allow chaining.
+ * @return this, to allow chaining
*/
public SourceSessionParams setThrottlePolicy(ThrottlePolicy throttlePolicy) {
this.throttlePolicy = throttlePolicy;
@@ -55,7 +51,7 @@ public class SourceSessionParams {
/**
* Returns the number of seconds a message can spend trying to succeed.
*
- * @return The timeout in seconds.
+ * @return the timeout in seconds
*/
public double getTimeout() {
return timeout;
@@ -66,7 +62,7 @@ public class SourceSessionParams {
* for any message bus operation.
*
* @param timeout The numer of seconds allowed.
- * @return This, to allow chaining.
+ * @return this, to allow chaining
*/
public SourceSessionParams setTimeout(double timeout) {
this.timeout = timeout;
@@ -76,7 +72,7 @@ public class SourceSessionParams {
/**
* Returns whether or not a reply handler has been assigned to this.
*
- * @return True if a handler is set.
+ * @return true if a handler is set
*/
boolean hasReplyHandler() {
return replyHandler != null;
@@ -85,7 +81,7 @@ public class SourceSessionParams {
/**
* Returns the handler to receive incoming replies.
*
- * @return The handler.
+ * @return the handler
*/
public ReplyHandler getReplyHandler() {
return replyHandler;
@@ -95,10 +91,11 @@ public class SourceSessionParams {
* Sets the handler to recive incoming replies.
*
* @param handler The handler to set.
- * @return This, to allow chaining.
+ * @return this, to allow chaining
*/
public SourceSessionParams setReplyHandler(ReplyHandler handler) {
replyHandler = handler;
return this;
}
+
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
index 7d73cd4bf3f..aa1bc1ce624 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
@@ -2,7 +2,7 @@
package com.yahoo.messagebus;
/**
- * This is an implementatin of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a
+ * This is an implementation of the {@link ThrottlePolicy} that offers static limits to the amount of pending data a
* {@link SourceSession} is allowed to have. You may choose to set a limit to the total number of pending messages (by
* way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link
* #setMaxPendingSize(long)}), or some combination thereof.
@@ -17,7 +17,8 @@ public class StaticThrottlePolicy implements ThrottlePolicy {
private long maxPendingSize = 0;
private long pendingSize = 0;
- public boolean canSend(Message msg, int pendingCount) {
+ @Override
+ public boolean canSend(Message message, int pendingCount) {
if (maxPendingCount > 0 && pendingCount >= maxPendingCount) {
return false;
}
@@ -27,12 +28,14 @@ public class StaticThrottlePolicy implements ThrottlePolicy {
return true;
}
- public void processMessage(Message msg) {
- int size = msg.getApproxSize();
- msg.setContext(size);
+ @Override
+ public void processMessage(Message message) {
+ int size = message.getApproxSize();
+ message.setContext(size);
pendingSize += size;
}
+ @Override
public void processReply(Reply reply) {
int size = (Integer)reply.getContext();
pendingSize -= size;
@@ -41,7 +44,7 @@ public class StaticThrottlePolicy implements ThrottlePolicy {
/**
* Returns the maximum number of pending messages allowed.
*
- * @return The max limit.
+ * @return the max limit
*/
public int getMaxPendingCount() {
return maxPendingCount;
@@ -50,8 +53,8 @@ public class StaticThrottlePolicy implements ThrottlePolicy {
/**
* Sets the maximum number of pending messages allowed.
*
- * @param maxCount The max count.
- * @return This, to allow chaining.
+ * @param maxCount The max count
+ * @return this, to allow chaining
*/
public StaticThrottlePolicy setMaxPendingCount(int maxCount) {
maxPendingCount = maxCount;
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
index 30a0b82f2cd..1c7595f2b5c 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/ThrottlePolicy.java
@@ -14,23 +14,24 @@ public interface ThrottlePolicy {
/**
* Returns whether or not the given message can be sent according to the current state of this policy.
*
- * @param msg The message to evaluate.
- * @param pendingCount The current number of pending messages.
- * @return True to send the message.
+ * @param message the message to evaluate
+ * @param pendingCount the current number of pending messages
+ * @return true to send the message
*/
- public boolean canSend(Message msg, int pendingCount);
+ boolean canSend(Message message, int pendingCount);
/**
* This method is called once for every message that was accepted by {@link #canSend(Message, int)} and sent.
*
- * @param msg The message beint sent.
+ * @param message the message being sent
*/
- public void processMessage(Message msg);
+ void processMessage(Message message);
/**
* This method is called once for every reply that is received.
*
- * @param reply The reply received.
+ * @param reply the reply received
*/
- public void processReply(Reply reply);
+ void processReply(Reply reply);
+
}