diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2024-05-13 13:22:59 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-13 13:22:59 +0200 |
commit | 8cd2544b6fdfc4000b306d1353d2b8b926363877 (patch) | |
tree | 6090378c8fb4f7ece8de1e0b67f176e883d3f190 | |
parent | 8705645af3f088a6d2b2a8b099c265eecc9ec3c1 (diff) | |
parent | c6d8d30bd76f426cd8b2fd45c125c6da57c768f1 (diff) |
Merge pull request #31179 from vespa-engine/jonmv/improved-feed-throttle-algorithm
Jonmv/improved feed throttle algorithm
8 files changed, 93 insertions, 24 deletions
diff --git a/client/README.md b/client/README.md index f51db87d631..e730614230f 100644 --- a/client/README.md +++ b/client/README.md @@ -39,7 +39,7 @@ This is a [work-in-progress javascript app](js/app) for querying a Vespa applica <!-- ToDo: move this / demote this somehow --> -### vespa_query_dsl +### vespa\_query\_dsl This lib is used for composing Vespa [YQL queries](https://docs.vespa.ai/en/reference/query-language-reference.html). diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index 39900156563..3eb0ccd17f6 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -37,8 +37,8 @@ type dynamicThrottler struct { func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { var ( - minInflight = 16 * int64(connections) - maxInflight = 256 * minInflight // 4096 max streams per connection on the server side + minInflight = 2 * int64(connections) + maxInflight = 256 * minInflight // 512 max streams per connection on the server side ) t := &dynamicThrottler{ minInflight: minInflight, @@ -49,7 +49,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { start: nowFunc(), now: nowFunc, } - t.targetInflight.Store(8 * minInflight) + t.targetInflight.Store(minInflight) t.targetTimesTen.Store(10 * maxInflight) return t } @@ -57,7 +57,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) } func (t *dynamicThrottler) Sent() { - currentInflight := t.targetInflight.Load() + currentInflight := t.TargetInflight() t.sent++ if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { return @@ -73,8 +73,12 @@ func (t *dynamicThrottler) Sent() { t.throughputs[index] = currentThroughput // Loop over throughput measurements and pick the one which optimises throughput and latency. - choice := float64(currentInflight) + best := float64(currentInflight) maxObjective := float64(-1) + choice := 0 + j := -1 + k := -1 + s := 0.0 for i := len(t.throughputs) - 1; i >= 0; i-- { if t.throughputs[i] == 0 { continue // Skip unknown values @@ -83,10 +87,25 @@ func (t *dynamicThrottler) Sent() { objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight) if objective > maxObjective { maxObjective = objective - choice = inflight + best = inflight + choice = i } + // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness + if j != -1 { + u := t.throughputs[j] + if k != -1 { + t.throughputs[j] = (2*u + t.throughputs[i] + s) / 4 + } + s = u + } + k = j + j = i + } + target := int64((rand.Float64()*0.40+0.84)*best + rand.Float64()*4 - 1) // Random walk, skewed towards increase + // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration + if choice == j && choice+1 < len(t.throughputs) { + target = int64(1 + float64(t.minInflight)*math.Pow(256, (float64(choice)+1.5)/float64(len(t.throughputs)))) } - target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase t.targetInflight.Store(max(t.minInflight, min(t.maxInflight, target))) } diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go index 03f0bc75bdc..b386e0d5105 100644 --- a/client/go/internal/vespa/document/throttler_test.go +++ b/client/go/internal/vespa/document/throttler_test.go @@ -9,14 +9,19 @@ import ( func TestThrottler(t *testing.T) { clock := &manualClock{tick: time.Second} tr := newThrottler(8, clock.now) - for i := 0; i < 100; i++ { + + if got, want := tr.TargetInflight(), int64(16); got != want { + t.Errorf("got TargetInflight() = %d, but want %d", got, want) + } + for i := 0; i < 30; i++ { tr.Sent() + tr.Success() } - if got, want := tr.TargetInflight(), int64(1024); got != want { + if got, want := tr.TargetInflight(), int64(18); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } - tr.Throttled(5) - if got, want := tr.TargetInflight(), int64(128); got != want { + tr.Throttled(34) + if got, want := tr.TargetInflight(), int64(17); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 97f681404e9..1a42b688437 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -167,10 +167,10 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } /** - * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this to *boom*. + * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this go *boom*. * 0.2 is at the very start, and makes the algorithm more conservative. Probably fine to stay away from this. */ - // Original javadoc is non-sense, but kept for historical reasons. + // Original javadoc is nonsense, but kept for historical reasons. /* * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is * the correlation between throughput and window size. The algorithm will increase the window size until efficiency diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java index 951a1776b6f..567788b8501 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java @@ -28,12 +28,12 @@ public class DynamicThrottler extends StaticThrottler { public DynamicThrottler(FeedClientBuilderImpl builder) { super(builder); - targetInflight = new AtomicLong(8 * minInflight); + targetInflight = new AtomicLong(minInflight); } @Override public void sent(long __, CompletableFuture<HttpResponse> ___) { - double currentInflight = targetInflight.get(); + double currentInflight = targetInflight(); if (++sent * sent * sent < 1e2 * currentInflight * currentInflight) return; @@ -43,22 +43,36 @@ public class DynamicThrottler extends StaticThrottler { // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight))) - / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection) + / log(256)); // 512 (server max streams per connection) / 2 (our min per connection) throughputs[index] = currentThroughput; // Loop over throughput measurements and pick the one which optimises throughput and latency. - double choice = currentInflight; + double best = currentInflight; double max = -1; - for (int i = throughputs.length; i-- > 0; ) { + int j = -1, k = -1, choice = 0; + double s = 0; + for (int i = 0; i < throughputs.length; i++) { if (throughputs[i] == 0) continue; // Skip unknown values. double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length); double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight). if (objective > max) { max = objective; - choice = inflight; + best = inflight; + choice = i; } + // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness. + if (j != -1) { + double t = throughputs[j]; + if (k != -1) throughputs[j] = (2 * t + throughputs[i] + s) / 4; + s = t; + } + k = j; + j = i; } - long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase. + long target = (long) ((random() * 0.40 + 0.84) * best + random() * 4 - 1); // Random step, skewed towards increase. + // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration. + if (choice == j && choice + 1 < throughputs.length) + target = (long) (1 + minInflight * pow(256, (choice + 1.5) / throughputs.length)); targetInflight.set(max(minInflight, min(maxInflight, target))); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java index 9010b0a7ad8..f0ee434e87c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java @@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler { public StaticThrottler(FeedClientBuilderImpl builder) { minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size(); - maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. + maxInflight = 256 * minInflight; // 512 max streams per connection on the server side. targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java new file mode 100644 index 00000000000..7e07fc6e116 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java @@ -0,0 +1,30 @@ +package ai.vespa.feed.client.impl; + +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author jonmv + */ +class DynamicThrottlerTest { + + @Test + void testThrottler() { + DynamicThrottler throttler = new DynamicThrottler(new FeedClientBuilderImpl(List.of(URI.create("http://localhost:8080")))); + assertEquals(16, throttler.targetInflight()); + + for (int i = 0; i < 30; i++) { + throttler.sent(1, null); + throttler.success(); + } + assertEquals(18, throttler.targetInflight()); + + throttler.throttled(34); + assertEquals(17, throttler.targetInflight()); + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index 54fab9b859b..b1a04ac9ed4 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -33,6 +33,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -105,7 +106,7 @@ class HttpRequestStrategyTest { cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); - assertTrue(expected.getCause() instanceof FeedException); + assertInstanceOf(FeedException.class, expected.getCause()); assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); @@ -200,7 +201,7 @@ class HttpRequestStrategyTest { @Override public int retries() { return 1; } }) .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1), + .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); |