diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-07 12:09:02 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-07 12:09:02 +0000 |
commit | 27f3c2a9cc66b272d4473bbbc0e7dbf58cd1c78a (patch) | |
tree | 39bbae887b1494bfbd5f815326bf47b0f739e1db /messagebus/src | |
parent | c7f2e56fdb745c2c0c61e2f59f9c45ee873531be (diff) |
Let feeder control throttle parameters
Diffstat (limited to 'messagebus/src')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java | 23 |
1 files changed, 19 insertions, 4 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 525d7ae8867..13e90b9a65d 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -27,6 +27,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { private double windowSizeIncrement = 20; private double windowSize = windowSizeIncrement; private double minWindowSize = windowSizeIncrement; + private double decrementFactor = 2.0; private double maxWindowSize = Integer.MAX_VALUE; private double windowSizeBackOff = 0.9; private double weight = 1.0; @@ -93,15 +94,15 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { numSent = 0; numOk = 0; - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput); - } if (maxThroughput > 0 && throughput > maxThroughput * 0.95) { // No need to increase window when we're this close to max. } else if (throughput > localMaxThroughput * 1.01) { localMaxThroughput = throughput; windowSize += weight*windowSizeIncrement; + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput); + } } else { // scale up/down throughput for comparing to window size double period = 1; @@ -113,11 +114,14 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } double efficiency = throughput*period/windowSize; if (efficiency < efficiencyThreshold) { - windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - 2* windowSizeIncrement); + windowSize = Math.max(windowSize * windowSizeBackOff, windowSize - decrementFactor * windowSizeIncrement); localMaxThroughput = 0; } else { windowSize += weight*windowSizeIncrement; } + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput + " efficiency " + efficiency); + } } windowSize = Math.max(minWindowSize, windowSize); windowSize = Math.min(maxWindowSize, windowSize); @@ -157,6 +161,17 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } /** + * Sets the relative stepsize when decreasing window size. + * + * @param decrementFactor the step size to set + * @return this, to allow chaining + */ + public DynamicThrottlePolicy setWindowSizeDecrementFactor(double decrementFactor) { + this.decrementFactor = decrementFactor; + return this; + } + + /** * Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing. * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range. |