aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-05-13 12:21:20 +0200
committerjonmv <venstad@gmail.com>2024-05-13 12:21:20 +0200
commitc6d8d30bd76f426cd8b2fd45c125c6da57c768f1 (patch)
tree3fa36fd75cf812ef9291ea821bb5ae72d3add106 /vespa-feed-client
parentf951681e3e1076606da092437a32c92454533e14 (diff)
Improve feed throttler and tests
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java24
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java30
2 files changed, 49 insertions, 5 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
index 81e972fe45d..567788b8501 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
@@ -43,22 +43,36 @@ public class DynamicThrottler extends StaticThrottler {
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight)))
- / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection)
+ / log(256)); // 512 (server max streams per connection) / 2 (our min per connection)
throughputs[index] = currentThroughput;
// Loop over throughput measurements and pick the one which optimises throughput and latency.
- double choice = currentInflight;
+ double best = currentInflight;
double max = -1;
- for (int i = throughputs.length; i-- > 0; ) {
+ int j = -1, k = -1, choice = 0;
+ double s = 0;
+ for (int i = 0; i < throughputs.length; i++) {
if (throughputs[i] == 0) continue; // Skip unknown values.
double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length);
double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight).
if (objective > max) {
max = objective;
- choice = inflight;
+ best = inflight;
+ choice = i;
}
+ // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness.
+ if (j != -1) {
+ double t = throughputs[j];
+ if (k != -1) throughputs[j] = (2 * t + throughputs[i] + s) / 4;
+ s = t;
+ }
+ k = j;
+ j = i;
}
- long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase.
+ long target = (long) ((random() * 0.40 + 0.84) * best + random() * 4 - 1); // Random step, skewed towards increase.
+ // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration.
+ if (choice == j && choice + 1 < throughputs.length)
+ target = (long) (1 + minInflight * pow(256, (choice + 1.5) / throughputs.length));
targetInflight.set(max(minInflight, min(maxInflight, target)));
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java
new file mode 100644
index 00000000000..7e07fc6e116
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java
@@ -0,0 +1,30 @@
+package ai.vespa.feed.client.impl;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author jonmv
+ */
+class DynamicThrottlerTest {
+
+ @Test
+ void testThrottler() {
+ DynamicThrottler throttler = new DynamicThrottler(new FeedClientBuilderImpl(List.of(URI.create("http://localhost:8080"))));
+ assertEquals(16, throttler.targetInflight());
+
+ for (int i = 0; i < 30; i++) {
+ throttler.sent(1, null);
+ throttler.success();
+ }
+ assertEquals(18, throttler.targetInflight());
+
+ throttler.throttled(34);
+ assertEquals(17, throttler.targetInflight());
+ }
+
+}