summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java28
1 files changed, 21 insertions, 7 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
index 951a1776b6f..567788b8501 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
@@ -28,12 +28,12 @@ public class DynamicThrottler extends StaticThrottler {
public DynamicThrottler(FeedClientBuilderImpl builder) {
super(builder);
- targetInflight = new AtomicLong(8 * minInflight);
+ targetInflight = new AtomicLong(minInflight);
}
@Override
public void sent(long __, CompletableFuture<HttpResponse> ___) {
- double currentInflight = targetInflight.get();
+ double currentInflight = targetInflight();
if (++sent * sent * sent < 1e2 * currentInflight * currentInflight)
return;
@@ -43,22 +43,36 @@ public class DynamicThrottler extends StaticThrottler {
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight)))
- / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection)
+ / log(256)); // 512 (server max streams per connection) / 2 (our min per connection)
throughputs[index] = currentThroughput;
// Loop over throughput measurements and pick the one which optimises throughput and latency.
- double choice = currentInflight;
+ double best = currentInflight;
double max = -1;
- for (int i = throughputs.length; i-- > 0; ) {
+ int j = -1, k = -1, choice = 0;
+ double s = 0;
+ for (int i = 0; i < throughputs.length; i++) {
if (throughputs[i] == 0) continue; // Skip unknown values.
double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length);
double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight).
if (objective > max) {
max = objective;
- choice = inflight;
+ best = inflight;
+ choice = i;
}
+ // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness.
+ if (j != -1) {
+ double t = throughputs[j];
+ if (k != -1) throughputs[j] = (2 * t + throughputs[i] + s) / 4;
+ s = t;
+ }
+ k = j;
+ j = i;
}
- long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase.
+ long target = (long) ((random() * 0.40 + 0.84) * best + random() * 4 - 1); // Random step, skewed towards increase.
+ // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration.
+ if (choice == j && choice + 1 < throughputs.length)
+ target = (long) (1 + minInflight * pow(256, (choice + 1.5) / throughputs.length));
targetInflight.set(max(minInflight, min(maxInflight, target)));
}