aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java')
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java353
1 files changed, 353 insertions, 0 deletions
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java b/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java
new file mode 100644
index 00000000000..7f48edfd48c
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java
@@ -0,0 +1,353 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * These tests are based on a simulated server, the {@link MockServer} below.
+ * The purpose is to both verify the behaviour of the algorithm, while also providing a playground
+ * for development, tuning, etc..
+ *
+ * @author jonmv
+ */
+public class DynamicThrottlePolicyTest {
+
+ static final Message message = new SimpleMessage("message");
+ static final Reply success = new SimpleReply("success");
+ static final Reply error = new SimpleReply("error");
+ static {
+ success.setContext(message.getApproxSize());
+ error.setContext(message.getApproxSize());
+ error.addError(new Error(0, "overload"));
+ }
+
+ @Test
+ public void singlePolicyWithSmallWindows() {
+ long operations = 1_000_000;
+ int numberOfWorkers = 1;
+ int maximumTasksPerWorker = 16;
+ int workerParallelism = 12;
+
+ { // This setup is lucky with the artificial local maxima for latency, and gives good results. See below for counter-examples.
+ int workPerSuccess = 8;
+
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
+ .setWindowSizeIncrement(0.1)
+ .setResizeRate(100);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ System.err.println(operations / (double) timer.milliTime());
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending);
+ assertInRange(1, summary.inefficiency, 1.1);
+ assertInRange(0, summary.waste, 0.01);
+ }
+
+ { // This setup is not so lucky, and the artificial behaviour pushes it into overload.
+ int workPerSuccess = 5;
+
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
+ .setWindowSizeIncrement(0.1)
+ .setResizeRate(100);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(maxMaxPending, summary.averagePending, maxMaxPending * 1.1);
+ assertInRange(maxMaxPending, summary.averageWindows[0], maxMaxPending * 1.1);
+ assertInRange(1.2, summary.inefficiency, 1.5);
+ assertInRange(0.5, summary.waste, 1.5);
+ }
+
+ { // This setup is not so lucky either, and the artificial behaviour keeps it far below a good throughput.
+ int workPerSuccess = 4;
+
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
+ .setWindowSizeIncrement(0.1)
+ .setResizeRate(100);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ assertInRange(0.3 * minMaxPending, summary.averagePending, 0.5 * minMaxPending);
+ assertInRange(0.3 * minMaxPending, summary.averageWindows[0], 0.5 * minMaxPending);
+ assertInRange(2, summary.inefficiency, 4);
+ assertInRange(0, summary.waste, 0);
+ }
+ }
+
+ /** Sort of a dummy test, as the conditions are perfect. In a more realistic scenario, below, the algorithm needs luck to climb this high. */
+ @Test
+ public void singlePolicySingleWorkerWithIncreasingParallelism() {
+ for (int i = 0; i < 5; i++) {
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
+ int scaleFactor = (int) Math.pow(10, i);
+ long operations = 3_000 * scaleFactor;
+ int workPerSuccess = 6;
+ int numberOfWorkers = 1;
+ int maximumTasksPerWorker = 100000;
+ int workerParallelism = scaleFactor;
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending);
+ assertInRange(1, summary.inefficiency, 1 + (5e-5 * scaleFactor)); // Slow ramp-up
+ assertInRange(0, summary.waste, 0.1);
+ }
+ }
+
+ /** A more realistic test, where throughput gradually flattens with increasing window size, and with more variance in throughput. */
+ @Test
+ public void singlePolicyIncreasingWorkersWithNoParallelism() {
+ for (int i = 0; i < 5; i++) {
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
+ int scaleFactor = (int) Math.pow(10, i);
+ long operations = 5_000 * scaleFactor;
+ // workPerSuccess determines the latency of the simulated server, which again determines the impact of the
+ // synthetic attractors of the algorithm, around latencies which give (close to) integer log10(1 / latency).
+ // With a value of 5, the impact is that the algorithm is pushed upwards slightly above 10k window size,
+ // which is the optimal choice for the case with 10000 clients.
+ // Change this to, e.g., 6 and the algorithm fails to climb as high, as the "ideal latency" is obtained at
+ // a lower latency than what is measured at 10k window size.
+ // On the other hand, changing it to 4 moves the attractor out of reach for the algorithm, which fails to
+ // push window size past 2k on its own.
+ int workPerSuccess = 5;
+ int numberOfWorkers = scaleFactor;
+ int maximumTasksPerWorker = 100000;
+ int workerParallelism = 1;
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending);
+ assertInRange(1, summary.inefficiency, 1 + 0.2 * i); // Even slower ramp-up.
+ assertInRange(0, summary.waste, 0);
+ }
+ }
+
+ @Test
+ public void twoWeightedPoliciesWithUnboundedTaskQueue() {
+ for (int i = 0; i < 10; i++) {
+ long operations = 1_000_000;
+ int workPerSuccess = 6 + (int) (30 * Math.random());
+ int numberOfWorkers = 1 + (int) (10 * Math.random());
+ int maximumTasksPerWorker = 100_000;
+ int workerParallelism = 32;
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy1 = new DynamicThrottlePolicy(timer);
+ DynamicThrottlePolicy policy2 = new DynamicThrottlePolicy(timer).setWeight(0.5);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy1, policy2);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ // Actual shares are not distributed perfectly proportionally to weights, but close enough.
+ assertInRange(minMaxPending * 0.6, summary.averageWindows[0], maxMaxPending * 0.6);
+ assertInRange(minMaxPending * 0.4, summary.averageWindows[1], maxMaxPending * 0.4);
+ assertInRange(1, summary.inefficiency, 1.02);
+ assertInRange(0, summary.waste, 0);
+ }
+ }
+
+ @Test
+ public void tenPoliciesVeryParallelServerWithShortTaskQueue() {
+ for (int i = 0; i < 10; i++) {
+ long operations = 1_000_000;
+ int workPerSuccess = 6;
+ int numberOfWorkers = 6;
+ int maximumTasksPerWorker = 180 + (int) (120 * Math.random());
+ int workerParallelism = 60 + (int) (40 * Math.random());
+ CustomTimer timer = new CustomTimer();
+ int p = 10;
+ DynamicThrottlePolicy[] policies = IntStream.range(0, p)
+ .mapToObj(j -> new DynamicThrottlePolicy(timer)
+ .setWeight((j + 1.0) / p)
+ .setWindowSizeIncrement(5)
+ .setMinWindowSize(1))
+ .toArray(DynamicThrottlePolicy[]::new);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policies);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ for (int j = 0; j < p; j++) {
+ double expectedShare = (j + 1) / (0.5 * p * (p + 1));
+ double imperfectionFactor = 1.5;
+ // Actual shares are not distributed perfectly proportionally to weights, but close enough.
+ assertInRange(minMaxPending * expectedShare / imperfectionFactor,
+ summary.averageWindows[j],
+ maxMaxPending * expectedShare * imperfectionFactor);
+ }
+ assertInRange(1.0, summary.inefficiency, 1.05);
+ assertInRange(0, summary.waste, 0.1);
+ }
+ }
+
+ static void assertInRange(double lower, double actual, double upper) {
+ System.err.printf("%10.4f <= %10.4f <= %10.4f\n", lower, actual, upper);
+ assertTrue(actual + " should be not be smaller than " + lower, lower <= actual);
+ assertTrue(actual + " should be not be greater than " + upper, upper >= actual);
+ }
+
+ private Summary run(long operations, int workPerSuccess, int numberOfWorkers, int maximumTasksPerWorker,
+ int workerParallelism, CustomTimer timer, DynamicThrottlePolicy... policies) {
+ System.err.printf("\n### Running %d operations of %d ticks each against %d workers with parallelism %d and queue size %d\n",
+ operations, workPerSuccess, numberOfWorkers, workerParallelism, maximumTasksPerWorker);
+
+ List<Integer> order = IntStream.range(0, policies.length).boxed().collect(toList());
+ MockServer resource = new MockServer(workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism);
+ AtomicLong outstanding = new AtomicLong(operations);
+ AtomicLong errors = new AtomicLong(0);
+ long ticks = 0;
+ long totalPending = 0;
+ double[] windows = new double[policies.length];
+ int[] pending = new int[policies.length];
+ while (outstanding.get() + resource.pending() > 0) {
+ Collections.shuffle(order);
+ for (int j = 0; j < policies.length; j++) {
+ int i = order.get(j);
+ DynamicThrottlePolicy policy = policies[i];
+ windows[i] += policy.getWindowSize();
+ while (policy.canSend(message, pending[i])) {
+ outstanding.decrementAndGet();
+ policy.processMessage(message);
+ ++pending[i];
+ resource.send(successful -> {
+ --pending[i];
+ if (successful)
+ policy.processReply(success);
+ else {
+ errors.incrementAndGet();
+ outstanding.incrementAndGet();
+ policy.processReply(error);
+ }
+ });
+ }
+ }
+ ++ticks;
+ totalPending += resource.pending();
+ resource.tick();
+ ++timer.millis;
+ }
+
+ for (int i = 0; i < windows.length; i++)
+ windows[i] /= ticks;
+
+ return new Summary(timer.milliTime() / (workPerSuccess * operations / (double) numberOfWorkers) * workerParallelism,
+ errors.get() / (double) operations,
+ totalPending / (double) ticks,
+ windows);
+ }
+
+ static class Summary {
+ final double inefficiency;
+ final double waste;
+ final double averagePending;
+ final double[] averageWindows;
+ Summary(double inefficiency, double waste, double averagePending, double[] averageWindows) {
+ this.inefficiency = inefficiency; // Time spent working / minimum time possible
+ this.waste = waste; // Number of error replies / number of successful replies
+ this.averagePending = averagePending; // Average number of pending operations in the server
+ this.averageWindows = averageWindows; // Average number of pending operations per policy
+ }
+ }
+
+ /**
+ * Resource shared between clients with throttle policies, with simulated throughput and efficiency.
+ *
+ * The model used for delay per request, and success/error, is derived from four basic attributes:
+ * <ul>
+ * <li>Ratio between work per successful and per failed reply</li>
+ * <li>Number of workers, each with minimum throughput equal to one failed reply per tick</li>
+ * <li>Parallelism of each worker — throughput increases linearly with queued tasks up to this number</li>
+ * <li>Maximum number of queued tasks per worker</li>
+ * </ul>
+ * <p>All messages are assumed to get a successful reply unless maximum pending replies is exceeded; when further
+ * messages arrive, the worker must immediately spend work to reject these before continuing its other work.
+ * The delay for a message is computed by assigning it to a random worker, and simulating the worker emptying its
+ * work queue. Since messages are assigned randomly, there will be some variation in delays and maximum throughput.
+ * The local correlation between max number of in-flight messages from the client, and its throughput,
+ * measured as number of successful replies per time unit, will start out at 1 and decrease gradually,
+ * eventually turning negative, as workers must spend work effort on failure replies as well.</p>
+ * <p> More specifically, a single worker yields a piecewise linear relationship between max pending and throughput —
+ * throughput first increases linearly with max pending, until saturated, and then remains constant, until overload,
+ * where it falls sharply. Several such workers together instead yield a throughput curve which gradually flattens
+ * as it approaches saturation, and also more gradually falls again, as overload is reached on some workers sometimes.</p>
+ */
+ static class MockServer {
+
+ final Random random = new Random();
+ final int workPerSuccess;
+ final int numberOfWorkers;
+ final int maximumTaskPerWorker;
+ final int workerParallelism;
+ final int[] currentTask;
+ final List<Deque<Consumer<Boolean>>> outstandingTasks;
+ int pending = 0;
+
+ MockServer(int workPerSuccess, int numberOfWorkers, int maximumTaskPerWorker, int workerParallelism) {
+ this.workPerSuccess = workPerSuccess;
+ this.numberOfWorkers = numberOfWorkers;
+ this.maximumTaskPerWorker = maximumTaskPerWorker;
+ this.workerParallelism = workerParallelism;
+ this.currentTask = new int[numberOfWorkers];
+ this.outstandingTasks = IntStream.range(0, numberOfWorkers)
+ .mapToObj(__ -> new ArrayDeque<Consumer<Boolean>>())
+ .collect(toUnmodifiableList());
+ }
+
+ /** Performs a tick, and returns whether work was done. */
+ void tick() {
+ for (int i = 0; i < numberOfWorkers; i++)
+ tick(i);
+ }
+
+ private void tick(int worker) {
+ Deque<Consumer<Boolean>> tasks = outstandingTasks.get(worker);
+ for (int i = 0; i < Math.min(workerParallelism, tasks.size()); i++) {
+ if (currentTask[worker] == 0) {
+ if (tasks.size() > maximumTaskPerWorker) {
+ tasks.pop().accept(false);
+ continue; // Spend work to signal failure to one excess task.
+ }
+ currentTask[worker] = workPerSuccess; // Start work on next task.
+ }
+ if (--currentTask[worker] == 0)
+ tasks.poll().accept(true); // Signal success to the completed task.
+ }
+ }
+
+ void send(Consumer<Boolean> replyHandler) {
+ ++pending;
+ outstandingTasks.get(random.nextInt(numberOfWorkers))
+ .addLast(outcome -> { --pending; replyHandler.accept(outcome); });
+ }
+
+ int pending() { return pending; }
+
+ }
+
+} \ No newline at end of file