summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-22 16:58:45 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-22 16:58:45 +0200
commitcf14cb26314664c33eea2b6fbc6a32d989cad812 (patch)
treebfab7b39df858c3dfa949821e7e281c5154c64aa
parent05584e3c36dda74570e4b95044eb39f485e4f484 (diff)
Use more robust throughput calculation with less overhead
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java72
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java6
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java2
3 files changed, 37 insertions, 43 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
index 6f4e4e752f0..b0279d27bf8 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
@@ -1,11 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import static java.lang.Math.log;
import static java.lang.Math.max;
@@ -20,57 +18,53 @@ import static java.lang.Math.random;
*/
public class DynamicThrottler extends StaticThrottler {
+ private final AtomicLong ok = new AtomicLong(0);
private final AtomicLong targetInflight;
- private long updateNanos = 0;
- private final List<AtomicLong> latencies = new ArrayList<>();
- private final double weight = 0.9; // Higher weight favours higher (own) throughput, at the cost of (shared) latency.
+ private final double weight = 0.7;
+ private final double[] throughputs = new double[128];
+ private long startNanos = System.nanoTime();
+ private long sent = 0;
public DynamicThrottler(FeedClientBuilder builder) {
super(builder);
- this.targetInflight = new AtomicLong(128L * builder.connectionsPerEndpoint * builder.endpoints.size());
- for (int i = 0; i < 128; i++)
- latencies.add(new AtomicLong(-1));
+ targetInflight = new AtomicLong(8 * minInflight);
}
@Override
- public void sent(long inflight, CompletableFuture<HttpResponse> vessel) {
- long startNanos = System.nanoTime();
- if (updateNanos == 0) updateNanos = System.nanoTime();
- boolean update = startNanos - updateNanos >= 1e8; // Ship ten updates per second.
- if (update) updateNanos = startNanos;
+ public void sent(long __, CompletableFuture<HttpResponse> ___) {
+ double currentInflight = targetInflight.get();
+ if (++sent * sent * sent < 1e2 * currentInflight * currentInflight)
+ return;
- vessel.whenComplete((response, thrown) -> {
- // Use buckets for latency measurements, with inflight along a log scale,
- // and with minInflight and maxInflight at the ends.
- int index = (int) (latencies.size() * log(max(1, (double) inflight / minInflight))
- / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection)
- long nowNanos = System.nanoTime();
- long latencyNanos = nowNanos - startNanos;
- latencies.get(index).set(latencyNanos);
- if ( ! update)
- return;
+ sent = 0;
+ double elapsedNanos = -startNanos + (startNanos = System.nanoTime());
+ double currentThroughput = ok.getAndSet(0) / elapsedNanos;
- // Loop over latency measurements and pick the one which optimises throughput and latency.
- double choice = -1;
- double max = -1;
- for (int i = latencies.size(); i-- > 0; ) {
- double latency = latencies.get(i).get();
- if (latency < 0) continue; // Skip unknown values.
- double target = minInflight * pow(256, (i + 0.5) / latencies.size());
- double objective = pow(target, weight) / latency; // Optimise throughput (weight), but also latency (1 - weight).
- if (objective > max) {
- max = objective;
- choice = target;
- }
+ // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
+ int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight)))
+ / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection)
+ throughputs[index] = currentThroughput;
+
+ // Loop over throughput measurements and pick the one which optimises throughput and latency.
+ double choice = currentInflight;
+ double max = -1;
+ for (int i = throughputs.length; i-- > 0; ) {
+ if (throughputs[i] == 0) continue; // Skip unknown values.
+ double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length);
+ double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight).
+ if (objective > max) {
+ max = objective;
+ choice = inflight;
}
- long target = (long) ((random() * 0.25 + 0.90) * choice); // Random walk, skewed towards increase.
- targetInflight.set(max(minInflight, min(maxInflight, target)));
- });
+ }
+ long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase.
+ targetInflight.set(max(minInflight, min(maxInflight, target)));
}
@Override
public void success() {
super.success();
+ ok.incrementAndGet();
}
@Override
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
index 4e0c4fe90f0..331bc213edf 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
@@ -19,9 +19,9 @@ public class StaticThrottler implements FeedClient.Throttler {
private final AtomicLong targetX10;
public StaticThrottler(FeedClientBuilder builder) {
- this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection;
- this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection);
- this.targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
+ minInflight = 16L * builder.connectionsPerEndpoint * builder.endpoints.size();
+ maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side.
+ targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
}
@Override
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
index 21ab6889e6e..405c9d493c1 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -33,7 +33,7 @@ class HttpRequestStrategyTest {
@Test
void testConcurrency() {
- int documents = 1 << 16;
+ int documents = 1 << 24;
HttpRequest request = new HttpRequest("PUT", "/", null, null);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);