summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2020-12-03 15:51:10 +0100
committerGitHub <noreply@github.com>2020-12-03 15:51:10 +0100
commita41bfa1b7c7e02ee90de60ce19a7fb199998bb89 (patch)
treee8b04c514c5bf0505efc2e37cb9635ff4d12d824 /messagebus
parent07dd588fbeb942582d3f8d5cd5950d0a5851161e (diff)
parent3abdb35128ce98f2dd0cf2dfeb36494cf8a60687 (diff)
Merge pull request #15603 from vespa-engine/jonmv/floating-point-window-size
jonmv/document and test dynamic throttling policy
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java79
-rw-r--r--messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java353
2 files changed, 412 insertions, 20 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
index 93c63112b46..ab09c9f5720 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
@@ -8,11 +8,43 @@ import java.util.logging.Logger;
/**
* This is an implementation of the {@link ThrottlePolicy} that offers dynamic limits to the number of pending messages a
- * {@link SourceSession} is allowed to have.
- *
- * <b>NOTE:</b> By context, "pending" is referring to the number of sent messages that have not been replied to yet.
+ * {@link SourceSession} is allowed to have. Pending means the number of sent messages that have not been replied to yet.
+ * <p>
+ * The algorithm works by increasing the number of messages allowed to be pending, the <em>window size</em>, until
+ * this no longer increases throughput. At this point, the algorithm is driven by synthetic attraction towards latencies
+ * which satisfy <code>log10(1 / latency) % 1 = e</code>, for some constant <code>0 &lt; e &lt; 1</code>. Weird? Most certainly!
+ * </p><p>
+ * The effect is that the window size the algorithm produces, for a saturated ideal server, has a level for each power
+ * of ten with an attractor the window size tends towards while on this level, determined by the <code>e</code> above.
+ * The {@link #setEfficiencyThreshold} determines the <code>e</code> constant. When <code>e</code> is set so the
+ * attractor is close to the start of the interval, this has an inhibiting effect on the algorithm, and it is basically
+ * reduced to "increase window size until this no longer increases throughput enough that it defeats the random noise".
+ * As the attractor moves towards the end of the intervals, the algorithm becomes increasingly eager in increasing
+ * its window size past what it measures as effective — if moved to the very end of the interval, the algorithm explodes.
+ * The default setting has the attractor at <code>log10(2)</code> of the way from start to end of these levels.
+ * </p><p>
+ * Because the algorithm stops increasing the window size when increases in throughput drown in random variations, the
+ * {@link #setWindowSizeIncrement} directly influences the efficient work domain of the algorithm. With the default
+ * setting of <code>20</code>, it seems to struggle to increase past window sizes of a couple thousand. Using a static
+ * increment (and a backoff factor) seems to be necessary to effectively balance the load different, competing policies
+ * are allowed to produce.
+ * </p><p>
+ * When there are multiple policies that compete against each other, they will increase load until saturating the server.
+ * If keeping all other policies but one fixed, this one policy would still see an increase in throughput with increased
+ * window size, even with a saturated server, as it would be given a greater share of the server's resources. However,
+ * since all algorithms adjust their windows concurrently, they will all reduce the throughput of the other algorithms.
+ * The result is that the commonly see the server as saturated, and commonly reach the behaviour where some increases in
+ * window size lead to measurable throughput gains, while others don't.
+ * </p><p>
+ * Now the weighting ({@link #setWeight} comes into play: with equals weights, two algorithms would have a break-even
+ * between being governed by the attractors (above), which eventually limits window size, and increases due to positive
+ * measurements, at the same point along the window size axis. With smaller weights, i.e., smaller increases to window
+ * size, this break-even occurs where the curve is steeper, i.e., where the client has a smaller share of the server.
+ * Thus, competing algorithms with different weights end up with a resource distribution roughly proportional to weight.
+ * </p>
*
* @author Simon Thoresen Hult
+ * @author jonmv
*/
public class DynamicThrottlePolicy extends StaticThrottlePolicy {
@@ -23,7 +55,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
private double resizeRate = 3;
private long resizeTime = 0;
private long timeOfLastMessage;
- private double efficiencyThreshold = 1.0;
+ private double efficiencyThreshold = 1;
private double windowSizeIncrement = 20;
private double windowSize = windowSizeIncrement;
private double minWindowSize = windowSizeIncrement;
@@ -77,7 +109,8 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
timeOfLastMessage = time;
int windowSizeFloored = (int) windowSize;
- boolean carry = numSent < (windowSize * resizeRate) * windowSize - windowSizeFloored;
+ // Use floating point window sizes, so the algorithm sees the different in 1.1 and 1.9 window size.
+ boolean carry = numSent < (windowSize * resizeRate) * (windowSize - windowSizeFloored);
return pendingCount < windowSizeFloored + (carry ? 1 : 0);
}
@@ -96,30 +129,30 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
numSent = 0;
numOk = 0;
-
if (maxThroughput > 0 && throughput > maxThroughput * 0.95) {
// No need to increase window when we're this close to max.
- } else if (throughput >= localMaxThroughput) {
+ // TODO jonmv: Not so sure — what if we're too high, and should back off?
+ } else if (throughput > localMaxThroughput) {
localMaxThroughput = throughput;
- windowSize += weight*windowSizeIncrement;
+ windowSize += weight * windowSizeIncrement;
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput);
}
} else {
// scale up/down throughput for comparing to window size
double period = 1;
- while(throughput * period/windowSize < 2) {
+ while(throughput * period / windowSize < 2) {
period *= 10;
}
- while(throughput * period/windowSize > 2) {
+ while(throughput * period / windowSize > 2) {
period *= 0.1;
}
- double efficiency = throughput*period/windowSize;
+ double efficiency = throughput * period / windowSize; // "efficiency" is a strange name. This is where on the level it is.
if (efficiency < efficiencyThreshold) {
windowSize = Math.min(windowSize * windowSizeBackOff, windowSize - decrementFactor * windowSizeIncrement);
localMaxThroughput = 0;
} else {
- windowSize += weight*windowSizeIncrement;
+ windowSize += weight * windowSizeIncrement;
}
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "windowSize " + windowSize + " throughput " + throughput + " local max " + localMaxThroughput + " efficiency " + efficiency);
@@ -138,6 +171,11 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
/**
+ * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this to *boom*.
+ * 0.2 is at the very start, and makes the algorithm more conservative. Probably fine to stay away from this.
+ */
+ // Original javadoc is non-sense, but kept for historical reasons.
+ /*
* Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is
* the correlation between throughput and window size. The algorithm will increase the window size until efficiency
* drops below the efficiency of the local maxima times this value.
@@ -176,8 +214,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Sets the factor of window size to back off to when the algorithm determines that efficiency is not increasing.
- * A value of 1 means that there is no back off from the local maxima, and means that the algorithm will fail to
- * reduce window size to something lower than a previous maxima. This value is capped to the [0, 1] range.
+ * Capped to [0, 1]
*
* @param windowSizeBackOff the back off to set
* @return this, to allow chaining
@@ -189,20 +226,20 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
* Sets the rate at which the window size is updated. The larger the value, the less responsive the resizing
- * becomes. However, the smaller the value, the less accurate the measurements become.
+ * becomes. However, the smaller the value, the less accurate the measurements become. Capped to [2, )
*
* @param resizeRate the rate to set
* @return this, to allow chaining
*/
public DynamicThrottlePolicy setResizeRate(double resizeRate) {
- this.resizeRate = resizeRate;
+ this.resizeRate = Math.max(2, resizeRate);
return this;
}
/**
* Sets the weight for this client. The larger the value, the more resources
- * will be allocated to this clients. Resources are shared between clients
- * proportionally to the set weights.
+ * will be allocated to this clients. Resources are shared between clients roughly
+ * proportionally to the set weights. Must be a positive number.
*
* @param weight the weight to set
* @return this, to allow chaining
@@ -213,7 +250,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
/**
- * Sets the maximium number of pending operations allowed at any time, in
+ * Sets the maximum number of pending operations allowed at any time, in
* order to avoid using too much resources.
*
* @param max the max to set
@@ -235,7 +272,7 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
/**
- * Sets the minimium number of pending operations allowed at any time, in
+ * Sets the minimum number of pending operations allowed at any time, in
* order to keep a level of performance.
*
* @param min the min to set
@@ -272,4 +309,6 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
return (int) windowSize;
}
+ double getWindowSize() { return windowSize; }
+
}
diff --git a/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java b/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java
new file mode 100644
index 00000000000..7f48edfd48c
--- /dev/null
+++ b/messagebus/src/test/java/com/yahoo/messagebus/DynamicThrottlePolicyTest.java
@@ -0,0 +1,353 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus;
+
+import com.yahoo.messagebus.test.SimpleMessage;
+import com.yahoo.messagebus.test.SimpleReply;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * These tests are based on a simulated server, the {@link MockServer} below.
+ * The purpose is to both verify the behaviour of the algorithm, while also providing a playground
+ * for development, tuning, etc..
+ *
+ * @author jonmv
+ */
+public class DynamicThrottlePolicyTest {
+
+ static final Message message = new SimpleMessage("message");
+ static final Reply success = new SimpleReply("success");
+ static final Reply error = new SimpleReply("error");
+ static {
+ success.setContext(message.getApproxSize());
+ error.setContext(message.getApproxSize());
+ error.addError(new Error(0, "overload"));
+ }
+
+ @Test
+ public void singlePolicyWithSmallWindows() {
+ long operations = 1_000_000;
+ int numberOfWorkers = 1;
+ int maximumTasksPerWorker = 16;
+ int workerParallelism = 12;
+
+ { // This setup is lucky with the artificial local maxima for latency, and gives good results. See below for counter-examples.
+ int workPerSuccess = 8;
+
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
+ .setWindowSizeIncrement(0.1)
+ .setResizeRate(100);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ System.err.println(operations / (double) timer.milliTime());
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending);
+ assertInRange(1, summary.inefficiency, 1.1);
+ assertInRange(0, summary.waste, 0.01);
+ }
+
+ { // This setup is not so lucky, and the artificial behaviour pushes it into overload.
+ int workPerSuccess = 5;
+
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
+ .setWindowSizeIncrement(0.1)
+ .setResizeRate(100);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(maxMaxPending, summary.averagePending, maxMaxPending * 1.1);
+ assertInRange(maxMaxPending, summary.averageWindows[0], maxMaxPending * 1.1);
+ assertInRange(1.2, summary.inefficiency, 1.5);
+ assertInRange(0.5, summary.waste, 1.5);
+ }
+
+ { // This setup is not so lucky either, and the artificial behaviour keeps it far below a good throughput.
+ int workPerSuccess = 4;
+
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer).setMinWindowSize(1)
+ .setWindowSizeIncrement(0.1)
+ .setResizeRate(100);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ assertInRange(0.3 * minMaxPending, summary.averagePending, 0.5 * minMaxPending);
+ assertInRange(0.3 * minMaxPending, summary.averageWindows[0], 0.5 * minMaxPending);
+ assertInRange(2, summary.inefficiency, 4);
+ assertInRange(0, summary.waste, 0);
+ }
+ }
+
+ /** Sort of a dummy test, as the conditions are perfect. In a more realistic scenario, below, the algorithm needs luck to climb this high. */
+ @Test
+ public void singlePolicySingleWorkerWithIncreasingParallelism() {
+ for (int i = 0; i < 5; i++) {
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
+ int scaleFactor = (int) Math.pow(10, i);
+ long operations = 3_000 * scaleFactor;
+ int workPerSuccess = 6;
+ int numberOfWorkers = 1;
+ int maximumTasksPerWorker = 100000;
+ int workerParallelism = scaleFactor;
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending);
+ assertInRange(1, summary.inefficiency, 1 + (5e-5 * scaleFactor)); // Slow ramp-up
+ assertInRange(0, summary.waste, 0.1);
+ }
+ }
+
+ /** A more realistic test, where throughput gradually flattens with increasing window size, and with more variance in throughput. */
+ @Test
+ public void singlePolicyIncreasingWorkersWithNoParallelism() {
+ for (int i = 0; i < 5; i++) {
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy = new DynamicThrottlePolicy(timer);
+ int scaleFactor = (int) Math.pow(10, i);
+ long operations = 5_000 * scaleFactor;
+ // workPerSuccess determines the latency of the simulated server, which again determines the impact of the
+ // synthetic attractors of the algorithm, around latencies which give (close to) integer log10(1 / latency).
+ // With a value of 5, the impact is that the algorithm is pushed upwards slightly above 10k window size,
+ // which is the optimal choice for the case with 10000 clients.
+ // Change this to, e.g., 6 and the algorithm fails to climb as high, as the "ideal latency" is obtained at
+ // a lower latency than what is measured at 10k window size.
+ // On the other hand, changing it to 4 moves the attractor out of reach for the algorithm, which fails to
+ // push window size past 2k on its own.
+ int workPerSuccess = 5;
+ int numberOfWorkers = scaleFactor;
+ int maximumTasksPerWorker = 100000;
+ int workerParallelism = 1;
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ assertInRange(minMaxPending, summary.averageWindows[0], maxMaxPending);
+ assertInRange(1, summary.inefficiency, 1 + 0.2 * i); // Even slower ramp-up.
+ assertInRange(0, summary.waste, 0);
+ }
+ }
+
+ @Test
+ public void twoWeightedPoliciesWithUnboundedTaskQueue() {
+ for (int i = 0; i < 10; i++) {
+ long operations = 1_000_000;
+ int workPerSuccess = 6 + (int) (30 * Math.random());
+ int numberOfWorkers = 1 + (int) (10 * Math.random());
+ int maximumTasksPerWorker = 100_000;
+ int workerParallelism = 32;
+ CustomTimer timer = new CustomTimer();
+ DynamicThrottlePolicy policy1 = new DynamicThrottlePolicy(timer);
+ DynamicThrottlePolicy policy2 = new DynamicThrottlePolicy(timer).setWeight(0.5);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policy1, policy2);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ // Actual shares are not distributed perfectly proportionally to weights, but close enough.
+ assertInRange(minMaxPending * 0.6, summary.averageWindows[0], maxMaxPending * 0.6);
+ assertInRange(minMaxPending * 0.4, summary.averageWindows[1], maxMaxPending * 0.4);
+ assertInRange(1, summary.inefficiency, 1.02);
+ assertInRange(0, summary.waste, 0);
+ }
+ }
+
+ @Test
+ public void tenPoliciesVeryParallelServerWithShortTaskQueue() {
+ for (int i = 0; i < 10; i++) {
+ long operations = 1_000_000;
+ int workPerSuccess = 6;
+ int numberOfWorkers = 6;
+ int maximumTasksPerWorker = 180 + (int) (120 * Math.random());
+ int workerParallelism = 60 + (int) (40 * Math.random());
+ CustomTimer timer = new CustomTimer();
+ int p = 10;
+ DynamicThrottlePolicy[] policies = IntStream.range(0, p)
+ .mapToObj(j -> new DynamicThrottlePolicy(timer)
+ .setWeight((j + 1.0) / p)
+ .setWindowSizeIncrement(5)
+ .setMinWindowSize(1))
+ .toArray(DynamicThrottlePolicy[]::new);
+ Summary summary = run(operations, workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism, timer, policies);
+
+ double minMaxPending = numberOfWorkers * workerParallelism;
+ double maxMaxPending = numberOfWorkers * maximumTasksPerWorker;
+ assertInRange(minMaxPending, summary.averagePending, maxMaxPending);
+ for (int j = 0; j < p; j++) {
+ double expectedShare = (j + 1) / (0.5 * p * (p + 1));
+ double imperfectionFactor = 1.5;
+ // Actual shares are not distributed perfectly proportionally to weights, but close enough.
+ assertInRange(minMaxPending * expectedShare / imperfectionFactor,
+ summary.averageWindows[j],
+ maxMaxPending * expectedShare * imperfectionFactor);
+ }
+ assertInRange(1.0, summary.inefficiency, 1.05);
+ assertInRange(0, summary.waste, 0.1);
+ }
+ }
+
+ static void assertInRange(double lower, double actual, double upper) {
+ System.err.printf("%10.4f <= %10.4f <= %10.4f\n", lower, actual, upper);
+ assertTrue(actual + " should be not be smaller than " + lower, lower <= actual);
+ assertTrue(actual + " should be not be greater than " + upper, upper >= actual);
+ }
+
+ private Summary run(long operations, int workPerSuccess, int numberOfWorkers, int maximumTasksPerWorker,
+ int workerParallelism, CustomTimer timer, DynamicThrottlePolicy... policies) {
+ System.err.printf("\n### Running %d operations of %d ticks each against %d workers with parallelism %d and queue size %d\n",
+ operations, workPerSuccess, numberOfWorkers, workerParallelism, maximumTasksPerWorker);
+
+ List<Integer> order = IntStream.range(0, policies.length).boxed().collect(toList());
+ MockServer resource = new MockServer(workPerSuccess, numberOfWorkers, maximumTasksPerWorker, workerParallelism);
+ AtomicLong outstanding = new AtomicLong(operations);
+ AtomicLong errors = new AtomicLong(0);
+ long ticks = 0;
+ long totalPending = 0;
+ double[] windows = new double[policies.length];
+ int[] pending = new int[policies.length];
+ while (outstanding.get() + resource.pending() > 0) {
+ Collections.shuffle(order);
+ for (int j = 0; j < policies.length; j++) {
+ int i = order.get(j);
+ DynamicThrottlePolicy policy = policies[i];
+ windows[i] += policy.getWindowSize();
+ while (policy.canSend(message, pending[i])) {
+ outstanding.decrementAndGet();
+ policy.processMessage(message);
+ ++pending[i];
+ resource.send(successful -> {
+ --pending[i];
+ if (successful)
+ policy.processReply(success);
+ else {
+ errors.incrementAndGet();
+ outstanding.incrementAndGet();
+ policy.processReply(error);
+ }
+ });
+ }
+ }
+ ++ticks;
+ totalPending += resource.pending();
+ resource.tick();
+ ++timer.millis;
+ }
+
+ for (int i = 0; i < windows.length; i++)
+ windows[i] /= ticks;
+
+ return new Summary(timer.milliTime() / (workPerSuccess * operations / (double) numberOfWorkers) * workerParallelism,
+ errors.get() / (double) operations,
+ totalPending / (double) ticks,
+ windows);
+ }
+
+ static class Summary {
+ final double inefficiency;
+ final double waste;
+ final double averagePending;
+ final double[] averageWindows;
+ Summary(double inefficiency, double waste, double averagePending, double[] averageWindows) {
+ this.inefficiency = inefficiency; // Time spent working / minimum time possible
+ this.waste = waste; // Number of error replies / number of successful replies
+ this.averagePending = averagePending; // Average number of pending operations in the server
+ this.averageWindows = averageWindows; // Average number of pending operations per policy
+ }
+ }
+
+ /**
+ * Resource shared between clients with throttle policies, with simulated throughput and efficiency.
+ *
+ * The model used for delay per request, and success/error, is derived from four basic attributes:
+ * <ul>
+ * <li>Ratio between work per successful and per failed reply</li>
+ * <li>Number of workers, each with minimum throughput equal to one failed reply per tick</li>
+ * <li>Parallelism of each worker — throughput increases linearly with queued tasks up to this number</li>
+ * <li>Maximum number of queued tasks per worker</li>
+ * </ul>
+ * <p>All messages are assumed to get a successful reply unless maximum pending replies is exceeded; when further
+ * messages arrive, the worker must immediately spend work to reject these before continuing its other work.
+ * The delay for a message is computed by assigning it to a random worker, and simulating the worker emptying its
+ * work queue. Since messages are assigned randomly, there will be some variation in delays and maximum throughput.
+ * The local correlation between max number of in-flight messages from the client, and its throughput,
+ * measured as number of successful replies per time unit, will start out at 1 and decrease gradually,
+ * eventually turning negative, as workers must spend work effort on failure replies as well.</p>
+ * <p> More specifically, a single worker yields a piecewise linear relationship between max pending and throughput —
+ * throughput first increases linearly with max pending, until saturated, and then remains constant, until overload,
+ * where it falls sharply. Several such workers together instead yield a throughput curve which gradually flattens
+ * as it approaches saturation, and also more gradually falls again, as overload is reached on some workers sometimes.</p>
+ */
+ static class MockServer {
+
+ final Random random = new Random();
+ final int workPerSuccess;
+ final int numberOfWorkers;
+ final int maximumTaskPerWorker;
+ final int workerParallelism;
+ final int[] currentTask;
+ final List<Deque<Consumer<Boolean>>> outstandingTasks;
+ int pending = 0;
+
+ MockServer(int workPerSuccess, int numberOfWorkers, int maximumTaskPerWorker, int workerParallelism) {
+ this.workPerSuccess = workPerSuccess;
+ this.numberOfWorkers = numberOfWorkers;
+ this.maximumTaskPerWorker = maximumTaskPerWorker;
+ this.workerParallelism = workerParallelism;
+ this.currentTask = new int[numberOfWorkers];
+ this.outstandingTasks = IntStream.range(0, numberOfWorkers)
+ .mapToObj(__ -> new ArrayDeque<Consumer<Boolean>>())
+ .collect(toUnmodifiableList());
+ }
+
+ /** Performs a tick, and returns whether work was done. */
+ void tick() {
+ for (int i = 0; i < numberOfWorkers; i++)
+ tick(i);
+ }
+
+ private void tick(int worker) {
+ Deque<Consumer<Boolean>> tasks = outstandingTasks.get(worker);
+ for (int i = 0; i < Math.min(workerParallelism, tasks.size()); i++) {
+ if (currentTask[worker] == 0) {
+ if (tasks.size() > maximumTaskPerWorker) {
+ tasks.pop().accept(false);
+ continue; // Spend work to signal failure to one excess task.
+ }
+ currentTask[worker] = workPerSuccess; // Start work on next task.
+ }
+ if (--currentTask[worker] == 0)
+ tasks.poll().accept(true); // Signal success to the completed task.
+ }
+ }
+
+ void send(Consumer<Boolean> replyHandler) {
+ ++pending;
+ outstandingTasks.get(random.nextInt(numberOfWorkers))
+ .addLast(outcome -> { --pending; replyHandler.accept(outcome); });
+ }
+
+ int pending() { return pending; }
+
+ }
+
+} \ No newline at end of file