summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-07 12:09:02 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-04-07 12:09:02 +0000
commit27f3c2a9cc66b272d4473bbbc0e7dbf58cd1c78a (patch)
tree39bbae887b1494bfbd5f815326bf47b0f739e1db /messagebus
parentc7f2e56fdb745c2c0c61e2f59f9c45ee873531be (diff)
Let feeder control throttle parameters
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java23
1 files changed, 19 insertions, 4 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
index 525d7ae8867..13e90b9a65d 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
@@ -27,6 +27,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
private double windowSizeIncrement = 20;
private double windowSize = windowSizeIncrement;
private double minWindowSize = windowSizeIncrement;
+ private double decrementFactor = 2.0;
private double maxWindowSize = Integer.MAX_VALUE;
private double windowSizeBackOff = 0.9;
private double weight = 1.0;
@@ -93,15 +94,15 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
numSent = 0;
numOk = 0;
- if (log.isLoggable(LogLevel.DEBUG)) {
- log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput);
- }
if (maxThroughput > 0 && throughput > maxThroughput * 0.95) {
// No need to increase window when we're this close to max.
} else if (throughput > localMaxThroughput * 1.01) {
localMaxThroughput = throughput;
windowSize += weight*windowSizeIncrement;
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput);
+ }
} else {
// scale up/down throughput for comparing to window size
double period = 1;
@@ -113,11 +114,14 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
double efficiency = throughput*period/windowSize;
if (efficiency < efficiencyThreshold) {
- windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - 2* windowSizeIncrement);
+ windowSize = Math.max(windowSize * windowSizeBackOff, windowSize - decrementFactor * windowSizeIncrement);
localMaxThroughput = 0;
} else {
windowSize += weight*windowSizeIncrement;
}
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput + " efficiency " + efficiency);
+ }
}
windowSize = Math.max(minWindowSize, windowSize);
windowSize = Math.min(maxWindowSize, windowSize);
@@ -157,6 +161,17 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
/**
+ * Sets the relative stepsize when decreasing window size.
+ *
+ * @param decrementFactor the step size to set
+ * @return this, to allow chaining
+ */
+ public DynamicThrottlePolicy setWindowSizeDecrementFactor(double decrementFactor) {
+ this.decrementFactor = decrementFactor;
+ return this;
+ }
+
+ /**
* Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing.
* A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to
* reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range.