diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-07 12:09:02 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-07 12:09:02 +0000 |
commit | 27f3c2a9cc66b272d4473bbbc0e7dbf58cd1c78a (patch) | |
tree | 39bbae887b1494bfbd5f815326bf47b0f739e1db | |
parent | c7f2e56fdb745c2c0c61e2f59f9c45ee873531be (diff) |
Let feeder control throttle parameters
4 files changed, 94 insertions, 8 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 525d7ae8867..13e90b9a65d 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -27,6 +27,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { private double windowSizeIncrement = 20; private double windowSize = windowSizeIncrement; private double minWindowSize = windowSizeIncrement; + private double decrementFactor = 2.0; private double maxWindowSize = Integer.MAX_VALUE; private double windowSizeBackOff = 0.9; private double weight = 1.0; @@ -93,15 +94,15 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { numSent = 0; numOk = 0; - if (log.isLoggable(LogLevel.DEBUG)) { - log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput); - } if (maxThroughput > 0 && throughput > maxThroughput * 0.95) { // No need to increase window when we're this close to max. } else if (throughput > localMaxThroughput * 1.01) { localMaxThroughput = throughput; windowSize += weight*windowSizeIncrement; + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput); + } } else { // scale up/down throughput for comparing to window size double period = 1; @@ -113,11 +114,14 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } double efficiency = throughput*period/windowSize; if (efficiency < efficiencyThreshold) { - windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - 2* windowSizeIncrement); + windowSize = Math.max(windowSize * windowSizeBackOff, windowSize - decrementFactor * windowSizeIncrement); localMaxThroughput = 0; } else { windowSize += weight*windowSizeIncrement; } + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput + " efficiency " + efficiency); + } } windowSize = Math.max(minWindowSize, windowSize); windowSize = Math.min(maxWindowSize, windowSize); @@ -157,6 +161,17 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } /** + * Sets the relative stepsize when decreasing window size. + * + * @param decrementFactor the step size to set + * @return this, to allow chaining + */ + public DynamicThrottlePolicy setWindowSizeDecrementFactor(double decrementFactor) { + this.decrementFactor = decrementFactor; + return this; + } + + /** * Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing. * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range. diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java index c1e164f7fe8..0ee1db5dbff 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -34,6 +34,12 @@ class FeederParams { private boolean benchmarkMode = false; private int numDispatchThreads = 1; private int maxPending = 0; + + private double windowSizeBackOff = 0.95; + private double windowDecrementFactor = 1.2; + private double windowResizeRate = 3; + private int windowIncrementSize = 20; + private int numConnectionsPerTarget = 1; private long numMessagesToSend = Long.MAX_VALUE; private List<InputStream> inputStreams = new ArrayList<>(); @@ -83,6 +89,21 @@ class FeederParams { this.configId = configId; return this; } + public double getWindowSizeBackOff() { + return windowSizeBackOff; + } + + public double getWindowDecrementFactor() { + return windowDecrementFactor; + } + + public double getWindowResizeRate() { + return windowResizeRate; + } + + public int getWindowIncrementSize() { + return windowIncrementSize; + } int getNumConnectionsPerTarget() { return numConnectionsPerTarget; } @@ -117,6 +138,10 @@ class FeederParams { opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension."); opts.addOption("c", "numconnections", true, "Number of connections per host."); opts.addOption("l", "nummessages", true, "Number of messages to send (all is default)."); + opts.addOption("wi", "window_incrementsize", true, "Dynamic window increment step size. default = " + windowIncrementSize); + opts.addOption("wd", "window_decrementfactor", true, "Dynamic window decrement step size factor. default = " + windowDecrementFactor); + opts.addOption("wb", "window_backoffactor", true, "Dynamic window backoff factor. default = " + windowSizeBackOff); + opts.addOption("wr", "window_resizerate", true, "Dynamic window resize rate. default = " + windowResizeRate); CommandLine cmd = new DefaultParser().parse(opts, args); @@ -129,6 +154,18 @@ class FeederParams { if (cmd.hasOption('c')) { numConnectionsPerTarget = Integer.valueOf(cmd.getOptionValue('c').trim()); } + if (cmd.hasOption("wi")) { + windowIncrementSize = Integer.valueOf(cmd.getOptionValue("wi").trim()); + } + if (cmd.hasOption("wi")) { + windowDecrementFactor = Double.valueOf(cmd.getOptionValue("wi").trim()); + } + if (cmd.hasOption("wb")) { + windowSizeBackOff = Double.valueOf(cmd.getOptionValue("wb").trim()); + } + if (cmd.hasOption("wr")) { + windowResizeRate = Double.valueOf(cmd.getOptionValue("wr").trim()); + } if (cmd.hasOption('r')) { route = Route.parse(cmd.getOptionValue('r').trim()); } diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index 556d9bd60c7..18c0de339e6 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -20,6 +20,7 @@ import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageBusParams; @@ -346,7 +347,7 @@ public class SimpleFeeder implements ReplyHandler { numThreads = params.getNumDispatchThreads(); numMessagesToSend = params.getNumMessagesToSend(); mbus = newMessageBus(docTypeMgr, params); - session = newSession(mbus, this, params.getMaxPending()); + session = newSession(mbus, this, params); docTypeMgr.configure(params.getConfigId()); benchmarkMode = params.isBenchmarkMode(); destination = (params.getDumpStream() != null) @@ -473,11 +474,18 @@ public class SimpleFeeder implements ReplyHandler { params.getConfigId()); } - private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, int maxPending) { + private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, FeederParams feederParams ) { SourceSessionParams params = new SourceSessionParams(); params.setReplyHandler(replyHandler); - if (maxPending > 0) { - params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(maxPending)); + if (feederParams.getMaxPending() > 0) { + params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(feederParams.getMaxPending())); + } else { + DynamicThrottlePolicy throttlePolicy = new DynamicThrottlePolicy() + .setWindowSizeIncrement(feederParams.getWindowIncrementSize()) + .setResizeRate(feederParams.getWindowResizeRate()) + .setWindowSizeDecrementFactor(feederParams.getWindowDecrementFactor()) + .setWindowSizeBackOff(feederParams.getWindowSizeBackOff()); + params.setThrottlePolicy(throttlePolicy); } return mbus.getMessageBus().createSourceSession(params); } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java index 13e307d9973..bbc27cfa084 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -101,6 +101,32 @@ public class FeederParamsTest { } @Test + public void requireThatWindowSizeIncrementIsParsed() throws ParseException, FileNotFoundException { + assertEquals(20, new FeederParams().getWindowIncrementSize()); + assertEquals(17, new FeederParams().parseArgs("--window_incrementsize", "17").getWindowIncrementSize()); + } + + static final double SMALL_NUMBER = 0.000000000001; + + @Test + public void requireThatWindowSizeDecrementFactorIsParsed() throws ParseException, FileNotFoundException { + assertEquals(1.2, new FeederParams().getWindowDecrementFactor(), SMALL_NUMBER); + assertEquals(1.3, new FeederParams().parseArgs("--window_decrementfactor", "1.3").getWindowIncrementSize(), SMALL_NUMBER); + } + + @Test + public void requireThatWindowResizeRateIsParsed() throws ParseException, FileNotFoundException { + assertEquals(3.0, new FeederParams().getWindowResizeRate(), SMALL_NUMBER); + assertEquals(5.5, new FeederParams().parseArgs("--window_resizerate", "5.5").getWindowResizeRate(), SMALL_NUMBER); + } + + @Test + public void requireThatWindowBackOffIsParsed() throws ParseException, FileNotFoundException { + assertEquals(0.95, new FeederParams().getWindowSizeBackOff(), SMALL_NUMBER); + assertEquals(0.97, new FeederParams().parseArgs("--window_backoff", "0.97").getWindowSizeBackOff(), SMALL_NUMBER); + } + + @Test public void requireThatDumpStreamAreParsed() throws ParseException, IOException { assertNull(new FeederParams().getDumpStream()); |