From cf14cb26314664c33eea2b6fbc6a32d989cad812 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Tue, 22 Jun 2021 16:58:45 +0200 Subject: Use more robust throughput calculation with less overhead --- .../ai/vespa/feed/client/DynamicThrottler.java | 72 ++++++++++------------ .../java/ai/vespa/feed/client/StaticThrottler.java | 6 +- .../vespa/feed/client/HttpRequestStrategyTest.java | 2 +- 3 files changed, 37 insertions(+), 43 deletions(-) (limited to 'vespa-feed-client') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java index 6f4e4e752f0..b0279d27bf8 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java @@ -1,11 +1,9 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import static java.lang.Math.log; import static java.lang.Math.max; @@ -20,57 +18,53 @@ import static java.lang.Math.random; */ public class DynamicThrottler extends StaticThrottler { + private final AtomicLong ok = new AtomicLong(0); private final AtomicLong targetInflight; - private long updateNanos = 0; - private final List latencies = new ArrayList<>(); - private final double weight = 0.9; // Higher weight favours higher (own) throughput, at the cost of (shared) latency. + private final double weight = 0.7; + private final double[] throughputs = new double[128]; + private long startNanos = System.nanoTime(); + private long sent = 0; public DynamicThrottler(FeedClientBuilder builder) { super(builder); - this.targetInflight = new AtomicLong(128L * builder.connectionsPerEndpoint * builder.endpoints.size()); - for (int i = 0; i < 128; i++) - latencies.add(new AtomicLong(-1)); + targetInflight = new AtomicLong(8 * minInflight); } @Override - public void sent(long inflight, CompletableFuture vessel) { - long startNanos = System.nanoTime(); - if (updateNanos == 0) updateNanos = System.nanoTime(); - boolean update = startNanos - updateNanos >= 1e8; // Ship ten updates per second. - if (update) updateNanos = startNanos; + public void sent(long __, CompletableFuture ___) { + double currentInflight = targetInflight.get(); + if (++sent * sent * sent < 1e2 * currentInflight * currentInflight) + return; - vessel.whenComplete((response, thrown) -> { - // Use buckets for latency measurements, with inflight along a log scale, - // and with minInflight and maxInflight at the ends. - int index = (int) (latencies.size() * log(max(1, (double) inflight / minInflight)) - / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection) - long nowNanos = System.nanoTime(); - long latencyNanos = nowNanos - startNanos; - latencies.get(index).set(latencyNanos); - if ( ! update) - return; + sent = 0; + double elapsedNanos = -startNanos + (startNanos = System.nanoTime()); + double currentThroughput = ok.getAndSet(0) / elapsedNanos; - // Loop over latency measurements and pick the one which optimises throughput and latency. - double choice = -1; - double max = -1; - for (int i = latencies.size(); i-- > 0; ) { - double latency = latencies.get(i).get(); - if (latency < 0) continue; // Skip unknown values. - double target = minInflight * pow(256, (i + 0.5) / latencies.size()); - double objective = pow(target, weight) / latency; // Optimise throughput (weight), but also latency (1 - weight). - if (objective > max) { - max = objective; - choice = target; - } + // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). + int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight))) + / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection) + throughputs[index] = currentThroughput; + + // Loop over throughput measurements and pick the one which optimises throughput and latency. + double choice = currentInflight; + double max = -1; + for (int i = throughputs.length; i-- > 0; ) { + if (throughputs[i] == 0) continue; // Skip unknown values. + double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length); + double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight). + if (objective > max) { + max = objective; + choice = inflight; } - long target = (long) ((random() * 0.25 + 0.90) * choice); // Random walk, skewed towards increase. - targetInflight.set(max(minInflight, min(maxInflight, target))); - }); + } + long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase. + targetInflight.set(max(minInflight, min(maxInflight, target))); } @Override public void success() { super.success(); + ok.incrementAndGet(); } @Override diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java index 4e0c4fe90f0..331bc213edf 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java @@ -19,9 +19,9 @@ public class StaticThrottler implements FeedClient.Throttler { private final AtomicLong targetX10; public StaticThrottler(FeedClientBuilder builder) { - this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection; - this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection); - this.targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. + minInflight = 16L * builder.connectionsPerEndpoint * builder.endpoints.size(); + maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. + targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. } @Override diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 21ab6889e6e..405c9d493c1 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -33,7 +33,7 @@ class HttpRequestStrategyTest { @Test void testConcurrency() { - int documents = 1 << 16; + int documents = 1 << 24; HttpRequest request = new HttpRequest("PUT", "/", null, null); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); -- cgit v1.2.3