diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-21 10:03:48 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-21 10:03:48 +0200 |
commit | cae18255a54eadc1d7949dd3564a8cc9a6056c75 (patch) | |
tree | d08b9b1accd8e6e17c53bfb4e54ba2f12bb67a96 /vespa-feed-client | |
parent | 7f0b51b604fd1e84be5684a8389c9e111fc5ede7 (diff) |
Some adjustments and fix int -> float division, try dynamic throttler
Diffstat (limited to 'vespa-feed-client')
3 files changed, 12 insertions, 12 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java index 6d23fed33f8..53e2ece3125 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java @@ -23,12 +23,12 @@ public class DynamicThrottler extends StaticThrottler { private final AtomicLong targetInflight; private long updateNanos = 0; private final List<AtomicReference<Double>> latencies = new ArrayList<>(); - private final double weight = 0.7; // Higher weight favours higher (own) throughput, at the cost of (shared) latency. + private final double weight = 0.8; // Higher weight favours higher (own) throughput, at the cost of (shared) latency. public DynamicThrottler(FeedClientBuilder builder) { super(builder); - this.targetInflight = new AtomicLong((long) (pow(minInflight, 0.5) * pow(maxInflight, 0.5))); - for (int i = 0; i < 1024; i++) + this.targetInflight = new AtomicLong(128L * builder.connectionsPerEndpoint * builder.endpoints.size()); + for (int i = 0; i < 128; i++) latencies.add(new AtomicReference<>(-1.0)); } @@ -42,11 +42,11 @@ public class DynamicThrottler extends StaticThrottler { vessel.whenComplete((response, thrown) -> { // Use buckets for latency measurements, with inflight along a log scale, // and with minInflight and maxInflight at the ends. - int index = (int) (latencies.size() * log(max(1, inflight / minInflight)) - / log((double) maxInflight / minInflight)); + int index = (int) (latencies.size() * log(max(1, (double) inflight / minInflight)) + / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection) long nowNanos = System.nanoTime(); long latencyNanos = nowNanos - startNanos; - double w1 = 0.5; // Update values with some of the new measurement, some of the old. + double w1 = 1e-2; // Update values with some of the new measurement, some of the old. double w2 = response != null && response.code() / 100 == 2 ? 1 - w1 : 1; // Punish non-successes. latencies.get(index).updateAndGet(latency -> latency < 0 ? latencyNanos : latencyNanos * w1 + latency * w2); if ( ! update) @@ -58,14 +58,14 @@ public class DynamicThrottler extends StaticThrottler { for (int i = latencies.size(); i-- > 0; ) { double latency = latencies.get(i).get(); if (latency < 0) continue; // Skip unknown values. - double target = minInflight * pow((double) maxInflight / minInflight, (double) i / latencies.size()); + double target = minInflight * pow(256, (i + 0.5) / latencies.size()); double objective = pow(target, weight) / latency; // Optimise throughput (weight), but also latency (1 - weight). if (objective > max) { max = objective; choice = target; } } - long target = (long) ((random() * 0.1 + 0.97) * choice); // Random walk, skewed towards increase. + long target = (long) ((random() * 0.15 + 0.95) * choice); // Random walk, skewed towards increase. targetInflight.set(max(minInflight, min(maxInflight, target))); }); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 763357d8fbb..e9cd0baba5b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -63,7 +63,7 @@ class HttpRequestStrategy implements RequestStrategy { this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; - this.throttler = new StaticThrottler(builder); + this.throttler = new DynamicThrottler(builder); Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher"); dispatcher.setDaemon(true); @@ -99,7 +99,6 @@ class HttpRequestStrategy implements RequestStrategy { delayedCount.incrementAndGet(); queue.offer(() -> { cluster.dispatch(request, vessel); - throttler.sent(inflight.get(), vessel); }); } @@ -206,6 +205,7 @@ class HttpRequestStrategy implements RequestStrategy { if (previous == null) { acquireSlot(); offer(request, vessel); + throttler.sent(inflight.get(), result); } else previous.whenComplete((__, ___) -> offer(request, vessel)); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 4ef24713e9a..21ab6889e6e 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -40,8 +40,8 @@ class HttpRequestStrategyTest { Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS)); HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) - .setConnectionsPerEndpoint(1 << 4) - .setMaxStreamPerConnection(1 << 20), + .setConnectionsPerEndpoint(1 << 10) + .setMaxStreamPerConnection(1 << 12), cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { |