From ac5041033b8782e2cb8b1f64f880f004c510bea6 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Wed, 2 Dec 2020 15:09:25 +0100 Subject: Add unit tests and some doc for DynamicThrottlingPolicy --- .../yahoo/messagebus/DynamicThrottlePolicy.java | 75 +++-- .../messagebus/DynamicThrottlePolicyTest.java | 353 +++++++++++++++++++++ 2 files changed, 410 insertions(+), 18 deletions(-) create mode 100644 messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java (limited to 'messagebus') diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 514f1234e89..63ac44fca74 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -8,11 +8,43 @@ import java.util.logging.Logger; /** * This is an implementation of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a - * {@link SourceSession} is allowed to have. - * - * NOTE: By context, "pending" is referring to the number of sent messages that have not been replied to yet. + * {@link SourceSession} is allowed to have. Pending means the number of sent messages that have not been replied to yet. + *

+ * The algorithm works by increasing the number of messages allowed to be pending, the winidow size, until + * this no longer increases throughput. At this point, the algorithm is driven by synthetic attraction towards latencies + * which satisfy log10(1 / latency) % 1 = e, for some constant 0 < e < 1. Weird? Most certainly! + *

+ * The effect is that the window size the algorithm produces, for a saturated ideal server, has a level for each power + * of ten with an attractor the window size tends towards while on this level, determined by the e above. + * The {@link #setEfficiencyThreshold} determines the e constant. When e is set so the + * attractor is close to the start of the interval, this has an inhibiting effect on the algorithm, and it is basically + * reduced to "increase window size until this no longer increases throughput enough that it defeats the random noise". + * As the attractor moves towards the end of the intervals, the algorithm becomes increasingly eager in increasing + * its window size past what it measures as effective — if moved to the very end of the interval, the algorithm explodes. + * The default setting has the attractor at log10(2) of the way from start to end of these levels. + *

+ * Because the algorithm stops increasing the window size when increases in throughput drown in random variations, the + * {@link #setWindowSizeIncrement} directly influences the efficient work domain of the algorithm. With the default + * setting of 20, it seems to struggle to increase past window sizes of a couple thousand. Using a static + * increment (and a backoff factor) seems to be necessary to effectively balance the load different, competing policies + * are allowed to produce. + *

+ * When there are multiple policies that compete against each other, they will increase load until saturating the server. + * If keeping all other policies but one fixed, this one policy would still see an increase in throughput with increased + * window size, even with a saturated server, as it would be given a greater share of the server's resources. However, + * since all algorithms adjust their windows concurrently, they will all reduce the throughput of the other algorithms. + * The result is that the commonly see the server as saturated, and commonly reach the behaviour where some increases in + * window size lead to measurable throughput gains, while others don't. + *

+ * Now the weighting ({@link #setWeight} comes into play: with equals weights, two algorithms would have a break-even + * between being governed by the attractors (above), which eventually limits window size, and increases due to positive + * measurements, at the same point along the window size axis. With smaller weights, i.e., smaller increases to window + * size, this break-even occurs where the curve is steeper, i.e., where the client has a smaller share of the server. + * Thus, competing algorithms with different weights end up with a resource distribution roughly proportional to weight. + *

* * @author Simon Thoresen Hult + * @author jonmv */ public class DynamicThrottlePolicy extends StaticThrottlePolicy { @@ -97,30 +129,31 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { numSent = 0; numOk = 0; - if (maxThroughput > 0 && throughput > maxThroughput * 0.95) { // No need to increase window when we're this close to max. - } else if (throughput >= localMaxThroughput) { + // TODO jonmv: Not so sure — what if we're too high, and should back off? + } else if (throughput > localMaxThroughput) { localMaxThroughput = throughput; - windowSize += weight*windowSizeIncrement; + windowSize += weight * windowSizeIncrement; if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput); } } else { // scale up/down throughput for comparing to window size double period = 1; - while(throughput * period/windowSize < 2) { + while(throughput * period / windowSize < 2) { period *= 10; } - while(throughput * period/windowSize > 2) { + while(throughput * period / windowSize > 2) { period *= 0.1; } - double efficiency = throughput*period/windowSize; + double efficiency = throughput * period / windowSize; // "efficiency" is a strange name. This is where on the level it is. + if (Math.random() < 1e-2) System.err.println(efficiency); if (efficiency < efficiencyThreshold) { windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - decrementFactor * windowSizeIncrement); localMaxThroughput = 0; } else { - windowSize += weight*windowSizeIncrement; + windowSize += weight * windowSizeIncrement; } if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput + " efficiency " + efficiency); @@ -139,6 +172,11 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } /** + * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this to *boom*. + * 0.2 is at the very start, and makes the algorithm more conservative. Probably fine to stay away from this. + */ + // Original javadoc is non-sense, but kept for historical reasons. + /* * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is * the correlation between throughput and window size. The algorithm will increase the window size until efficiency * drops below the efficiency of the local maxima times this value. @@ -177,8 +215,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { /** * Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing. - * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to - * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range. + * Capped to [0, 1] * * @param windowSizeBackOff the back off to set * @return this, to allow chaining @@ -190,20 +227,20 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { /** * Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing - * becomes. However, the smaller the value, the less accurate the measurements become. + * becomes. However, the smaller the value, the less accurate the measurements become. Capped to [2, ) * * @param resizeRate the rate to set * @return this, to allow chaining */ public DynamicThrottlePolicy setResizeRate(double resizeRate) { - this.resizeRate = resizeRate; + this.resizeRate = Math.max(2, resizeRate); return this; } /** * Sets the weight for this client. The larger the value, the more resources - * will be allocated to this clients. Resources are shared between clients - * proportionally to the set weights. + * will be allocated to this clients. Resources are shared between clients roughly + * proportionally to the set weights. Must be a positive number. * * @param weight the weight to set * @return this, to allow chaining @@ -214,7 +251,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } /** - * Sets the maximium number of pending operations allowed at any time, in + * Sets the maximum number of pending operations allowed at any time, in * order to avoid using too much resources. * * @param max the max to set @@ -236,7 +273,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { /** - * Sets the minimium number of pending operations allowed at any time, in + * Sets the minimum number of pending operations allowed at any time, in * order to keep a level of performance. * * @param min the min to set @@ -273,4 +310,6 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { return (int) windowSize; } + double getWindowSize() { return windowSize; } + } diff --git a/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java b/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java new file mode 100644 index 00000000000..7f48edfd48c --- /dev/null +++ b/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java @@ -0,0 +1,353 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.messagebus; + +import com.yahoo.messagebus.test.SimpleMessage; +import com.yahoo.messagebus.test.SimpleReply; +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toUnmodifiableList; +import static org.junit.Assert.assertTrue; + +/** + * These tests are based on a simulated server, the {@link MockServer} below. + * The purpose is to both verify the behaviour of the algorithm, while also providing a playground + * for development, tuning, etc.. + * + * @author jonmv + */ +public class DynamicThrottlePolicyTest { + + static final Message message = new SimpleMessage("message"); + static final Reply success = new SimpleReply("success"); + static final Reply error = new SimpleReply("error"); + static { + success.setContext(message.getApproxSize()); + error.setContext(message.getApproxSize()); + error.addError(new Error(0, "overload")); + } + + @Test + public void singlePolicyWithSmallWindows() { + long operations = 1_000_000; + int numberOfWorkers = 1; + int maximumTasksPerWorker = 16; + int workerParallelism = 12; + + { // This setup is lucky with the artificial local maxima for latency, and gives good results. See below for counter-examples. + int workPerSuccess = 8; + + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1) + .setWindowSizeIncrement(0.1) + .setResizeRate(100); + Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy); + + double minMaxPending = numberOfWorkers * workerParallelism; + double maxMaxPending = numberOfWorkers * maximumTasksPerWorker; + System.err.println(operations / (double) timer.milliTime()); + assertInRange(minMaxPending, summary.averagePending, maxMaxPending); + assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending); + assertInRange(1, summary.inefficiency, 1.1); + assertInRange(0, summary.waste, 0.01); + } + + { // This setup is not so lucky, and the artificial behaviour pushes it into overload. + int workPerSuccess = 5; + + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1) + .setWindowSizeIncrement(0.1) + .setResizeRate(100); + Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy); + + double maxMaxPending = numberOfWorkers * maximumTasksPerWorker; + assertInRange(maxMaxPending, summary.averagePending, maxMaxPending * 1.1); + assertInRange(maxMaxPending, summary.averageWindows[0], maxMaxPending * 1.1); + assertInRange(1.2, summary.inefficiency, 1.5); + assertInRange(0.5, summary.waste, 1.5); + } + + { // This setup is not so lucky either, and the artificial behaviour keeps it far below a good throughput. + int workPerSuccess = 4; + + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1) + .setWindowSizeIncrement(0.1) + .setResizeRate(100); + Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy); + + double minMaxPending = numberOfWorkers * workerParallelism; + assertInRange(0.3 * minMaxPending, summary.averagePending, 0.5 * minMaxPending); + assertInRange(0.3 * minMaxPending, summary.averageWindows[0], 0.5 * minMaxPending); + assertInRange(2, summary.inefficiency, 4); + assertInRange(0, summary.waste, 0); + } + } + + /** Sort of a dummy test, as the conditions are perfect. In a more realistic scenario, below, the algorithm needs luck to climb this high. */ + @Test + public void singlePolicySingleWorkerWithIncreasingParallelism() { + for (int i = 0; i < 5; i++) { + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer); + int scaleFactor = (int) Math.pow(10, i); + long operations = 3_000 * scaleFactor; + int workPerSuccess = 6; + int numberOfWorkers = 1; + int maximumTasksPerWorker = 100000; + int workerParallelism = scaleFactor; + Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy); + + double minMaxPending = numberOfWorkers * workerParallelism; + double maxMaxPending = numberOfWorkers * maximumTasksPerWorker; + assertInRange(minMaxPending, summary.averagePending, maxMaxPending); + assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending); + assertInRange(1, summary.inefficiency, 1 + (5e-5 * scaleFactor)); // Slow ramp-up + assertInRange(0, summary.waste, 0.1); + } + } + + /** A more realistic test, where throughput gradually flattens with increasing window size, and with more variance in throughput. */ + @Test + public void singlePolicyIncreasingWorkersWithNoParallelism() { + for (int i = 0; i < 5; i++) { + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer); + int scaleFactor = (int) Math.pow(10, i); + long operations = 5_000 * scaleFactor; + // workPerSuccess determines the latency of the simulated server, which again determines the impact of the + // synthetic attractors of the algorithm, around latencies which give (close to) integer log10(1 / latency). + // With a value of 5, the impact is that the algorithm is pushed upwards slightly above 10k window size, + // which is the optimal choice for the case with 10000 clients. + // Change this to, e.g., 6 and the algorithm fails to climb as high, as the "ideal latency" is obtained at + // a lower latency than what is measured at 10k window size. + // On the other hand, changing it to 4 moves the attractor out of reach for the algorithm, which fails to + // push window size past 2k on its own. + int workPerSuccess = 5; + int numberOfWorkers = scaleFactor; + int maximumTasksPerWorker = 100000; + int workerParallelism = 1; + Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy); + + double minMaxPending = numberOfWorkers * workerParallelism; + double maxMaxPending = numberOfWorkers * maximumTasksPerWorker; + assertInRange(minMaxPending, summary.averagePending, maxMaxPending); + assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending); + assertInRange(1, summary.inefficiency, 1 + 0.2 * i); // Even slower ramp-up. + assertInRange(0, summary.waste, 0); + } + } + + @Test + public void twoWeightedPoliciesWithUnboundedTaskQueue() { + for (int i = 0; i < 10; i++) { + long operations = 1_000_000; + int workPerSuccess = 6 + (int) (30 * Math.random()); + int numberOfWorkers = 1 + (int) (10 * Math.random()); + int maximumTasksPerWorker = 100_000; + int workerParallelism = 32; + CustomTimer timer = new CustomTimer(); + DynamicThrottlePolicy policy1 = new DynamicThrottlePolicy(timer); + DynamicThrottlePolicy policy2 = new DynamicThrottlePolicy(timer).setWeight(0.5); + Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy1, policy2); + + double minMaxPending = numberOfWorkers * workerParallelism; + double maxMaxPending = numberOfWorkers * maximumTasksPerWorker; + assertInRange(minMaxPending, summary.averagePending, maxMaxPending); + // Actual shares are not distributed perfectly proportionally to weights, but close enough. + assertInRange(minMaxPending * 0.6, summary.averageWindows[0], maxMaxPending * 0.6); + assertInRange(minMaxPending * 0.4, summary.averageWindows[1], maxMaxPending * 0.4); + assertInRange(1, summary.inefficiency, 1.02); + assertInRange(0, summary.waste, 0); + } + } + + @Test + public void tenPoliciesVeryParallelServerWithShortTaskQueue() { + for (int i = 0; i < 10; i++) { + long operations = 1_000_000; + int workPerSuccess = 6; + int numberOfWorkers = 6; + int maximumTasksPerWorker = 180 + (int) (120 * Math.random()); + int workerParallelism = 60 + (int) (40 * Math.random()); + CustomTimer timer = new CustomTimer(); + int p = 10; + DynamicThrottlePolicy[] policies = IntStream.range(0, p) + .mapToObj(j -> new DynamicThrottlePolicy(timer) + .setWeight((j + 1.0) / p) + .setWindowSizeIncrement(5) + .setMinWindowSize(1)) + .toArray(DynamicThrottlePolicy[]::new); + Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policies); + + double minMaxPending = numberOfWorkers * workerParallelism; + double maxMaxPending = numberOfWorkers * maximumTasksPerWorker; + assertInRange(minMaxPending, summary.averagePending, maxMaxPending); + for (int j = 0; j < p; j++) { + double expectedShare = (j + 1) / (0.5 * p * (p + 1)); + double imperfectionFactor = 1.5; + // Actual shares are not distributed perfectly proportionally to weights, but close enough. + assertInRange(minMaxPending * expectedShare / imperfectionFactor, + summary.averageWindows[j], + maxMaxPending * expectedShare * imperfectionFactor); + } + assertInRange(1.0, summary.inefficiency, 1.05); + assertInRange(0, summary.waste, 0.1); + } + } + + static void assertInRange(double lower, double actual, double upper) { + System.err.printf("%10.4f <= %10.4f <= %10.4f\n", lower, actual, upper); + assertTrue(actual + " should be not be smaller than " + lower, lower <= actual); + assertTrue(actual + " should be not be greater than " + upper, upper >= actual); + } + + private Summary run(long operations, int workPerSuccess, int numberOfWorkers, int maximumTasksPerWorker, + int workerParallelism, CustomTimer timer, DynamicThrottlePolicy... policies) { + System.err.printf("\n### Running %d operations of %d ticks each against %d workers with parallelism %d and queue size %d\n", + operations, workPerSuccess, numberOfWorkers, workerParallelism, maximumTasksPerWorker); + + List order = IntStream.range(0, policies.length).boxed().collect(toList()); + MockServer resource = new MockServer(workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism); + AtomicLong outstanding = new AtomicLong(operations); + AtomicLong errors = new AtomicLong(0); + long ticks = 0; + long totalPending = 0; + double[] windows = new double[policies.length]; + int[] pending = new int[policies.length]; + while (outstanding.get() + resource.pending() > 0) { + Collections.shuffle(order); + for (int j = 0; j < policies.length; j++) { + int i = order.get(j); + DynamicThrottlePolicy policy = policies[i]; + windows[i] += policy.getWindowSize(); + while (policy.canSend(message, pending[i])) { + outstanding.decrementAndGet(); + policy.processMessage(message); + ++pending[i]; + resource.send(successful -> { + --pending[i]; + if (successful) + policy.processReply(success); + else { + errors.incrementAndGet(); + outstanding.incrementAndGet(); + policy.processReply(error); + } + }); + } + } + ++ticks; + totalPending += resource.pending(); + resource.tick(); + ++timer.millis; + } + + for (int i = 0; i < windows.length; i++) + windows[i] /= ticks; + + return new Summary(timer.milliTime() / (workPerSuccess * operations / (double) numberOfWorkers) * workerParallelism, + errors.get() / (double) operations, + totalPending / (double) ticks, + windows); + } + + static class Summary { + final double inefficiency; + final double waste; + final double averagePending; + final double[] averageWindows; + Summary(double inefficiency, double waste, double averagePending, double[] averageWindows) { + this.inefficiency = inefficiency; // Time spent working / minimum time possible + this.waste = waste; // Number of error replies / number of successful replies + this.averagePending = averagePending; // Average number of pending operations in the server + this.averageWindows = averageWindows; // Average number of pending operations per policy + } + } + + /** + * Resource shared between clients with throttle policies, with simulated throughput and efficiency. + * + * The model used for delay per request, and success/error, is derived from four basic attributes: + * + *

All messages are assumed to get a successful reply unless maximum pending replies is exceeded; when further + * messages arrive, the worker must immediately spend work to reject these before continuing its other work. + * The delay for a message is computed by assigning it to a random worker, and simulating the worker emptying its + * work queue. Since messages are assigned randomly, there will be some variation in delays and maximum throughput. + * The local correlation between max number of in-flight messages from the client, and its throughput, + * measured as number of successful replies per time unit, will start out at 1 and decrease gradually, + * eventually turning negative, as workers must spend work effort on failure replies as well.

+ *

More specifically, a single worker yields a piecewise linear relationship between max pending and throughput — + * throughput first increases linearly with max pending, until saturated, and then remains constant, until overload, + * where it falls sharply. Several such workers together instead yield a throughput curve which gradually flattens + * as it approaches saturation, and also more gradually falls again, as overload is reached on some workers sometimes.

+ */ + static class MockServer { + + final Random random = new Random(); + final int workPerSuccess; + final int numberOfWorkers; + final int maximumTaskPerWorker; + final int workerParallelism; + final int[] currentTask; + final List>> outstandingTasks; + int pending = 0; + + MockServer(int workPerSuccess, int numberOfWorkers, int maximumTaskPerWorker, int workerParallelism) { + this.workPerSuccess = workPerSuccess; + this.numberOfWorkers = numberOfWorkers; + this.maximumTaskPerWorker = maximumTaskPerWorker; + this.workerParallelism = workerParallelism; + this.currentTask = new int[numberOfWorkers]; + this.outstandingTasks = IntStream.range(0, numberOfWorkers) + .mapToObj(__ -> new ArrayDeque>()) + .collect(toUnmodifiableList()); + } + + /** Performs a tick, and returns whether work was done. */ + void tick() { + for (int i = 0; i < numberOfWorkers; i++) + tick(i); + } + + private void tick(int worker) { + Deque> tasks = outstandingTasks.get(worker); + for (int i = 0; i < Math.min(workerParallelism, tasks.size()); i++) { + if (currentTask[worker] == 0) { + if (tasks.size() > maximumTaskPerWorker) { + tasks.pop().accept(false); + continue; // Spend work to signal failure to one excess task. + } + currentTask[worker] = workPerSuccess; // Start work on next task. + } + if (--currentTask[worker] == 0) + tasks.poll().accept(true); // Signal success to the completed task. + } + } + + void send(Consumer replyHandler) { + ++pending; + outstandingTasks.get(random.nextInt(numberOfWorkers)) + .addLast(outcome -> { --pending; replyHandler.accept(outcome); }); + } + + int pending() { return pending; } + + } + +} \ No newline at end of file -- cgit v1.2.3